| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 // Read the file in blocks of size 64k. | 7 // Read the file in blocks of size 64k. |
| 8 const int _BLOCK_SIZE = 64 * 1024; | 8 const int _BLOCK_SIZE = 64 * 1024; |
| 9 | 9 |
| 10 | 10 |
| 11 class _FileStream extends Stream<List<int>> { | 11 class _FileStream extends Stream<List<int>> { |
| 12 // Stream controller. | 12 // Stream controller. |
| 13 StreamController<List<int>> _controller; | 13 StreamController<List<int>> _controller; |
| 14 | 14 |
| 15 // Information about the underlying file. | 15 // Information about the underlying file. |
| 16 String _path; | 16 String _path; |
| 17 RandomAccessFile _openedFile; | 17 RandomAccessFile _openedFile; |
| 18 int _position; | 18 int _position; |
| 19 int _end; | 19 int _end; |
| 20 final Completer _closeCompleter = new Completer(); | 20 final Completer _closeCompleter = new Completer(); |
| 21 | 21 |
| 22 // Has the stream been paused or unsubscribed? | 22 // Has the stream been paused or unsubscribed? |
| 23 bool _paused = false; | |
| 24 bool _unsubscribed = false; | 23 bool _unsubscribed = false; |
| 25 | 24 |
| 26 // Is there a read currently in progress? | 25 // Is there a read currently in progress? |
| 27 bool _readInProgress = false; | 26 bool _readInProgress = true; |
| 28 bool _closed = false; | 27 bool _closed = false; |
| 29 | 28 |
| 30 // Block read but not yet send because stream is paused. | 29 bool _atEnd = false; |
| 31 List<int> _currentBlock; | |
| 32 | 30 |
| 33 _FileStream(this._path, this._position, this._end) { | 31 _FileStream(this._path, this._position, this._end) { |
| 34 _setupController(); | 32 if (_position == null) _position = 0; |
| 35 } | 33 } |
| 36 | 34 |
| 37 _FileStream.forStdin() : _position = 0 { | 35 _FileStream.forStdin() : _position = 0; |
| 38 _setupController(); | |
| 39 } | |
| 40 | 36 |
| 41 StreamSubscription<List<int>> listen(void onData(List<int> event), | 37 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 42 {Function onError, | 38 {Function onError, |
| 43 void onDone(), | 39 void onDone(), |
| 44 bool cancelOnError}) { | 40 bool cancelOnError}) { |
| 41 _setupController(); |
| 45 return _controller.stream.listen(onData, | 42 return _controller.stream.listen(onData, |
| 46 onError: onError, | 43 onError: onError, |
| 47 onDone: onDone, | 44 onDone: onDone, |
| 48 cancelOnError: cancelOnError); | 45 cancelOnError: cancelOnError); |
| 49 } | 46 } |
| 50 | 47 |
| 51 void _setupController() { | 48 void _setupController() { |
| 52 _controller = new StreamController<List<int>>(sync: true, | 49 _controller = new StreamController<List<int>>(sync: true, |
| 53 onListen: _start, | 50 onListen: _start, |
| 54 onPause: () => _paused = true, | 51 onResume: _readBlock, |
| 55 onResume: _resume, | |
| 56 onCancel: () { | 52 onCancel: () { |
| 57 _unsubscribed = true; | 53 _unsubscribed = true; |
| 58 return _closeFile(); | 54 return _closeFile(); |
| 59 }); | 55 }); |
| 60 } | 56 } |
| 61 | 57 |
| 62 Future _closeFile() { | 58 Future _closeFile() { |
| 63 if (_readInProgress || _closed) { | 59 if (_readInProgress || _closed) { |
| 64 return _closeCompleter.future; | 60 return _closeCompleter.future; |
| 65 } | 61 } |
| 66 _closed = true; | 62 _closed = true; |
| 63 |
| 67 void done() { | 64 void done() { |
| 68 _closeCompleter.complete(); | 65 _closeCompleter.complete(); |
| 69 _controller.close(); | 66 _controller.close(); |
| 70 } | 67 } |
| 71 if (_openedFile != null) { | 68 |
| 72 _openedFile.close() | 69 _openedFile.close() |
| 73 .catchError(_controller.addError) | 70 .catchError(_controller.addError) |
| 74 .whenComplete(done); | 71 .whenComplete(done); |
| 75 _openedFile = null; | |
| 76 } else { | |
| 77 done(); | |
| 78 } | |
| 79 return _closeCompleter.future; | 72 return _closeCompleter.future; |
| 80 } | 73 } |
| 81 | 74 |
| 82 void _readBlock() { | 75 void _readBlock() { |
| 83 // Don't start a new read if one is already in progress. | 76 // Don't start a new read if one is already in progress. |
| 84 if (_readInProgress) return; | 77 if (_readInProgress) return; |
| 78 if (_atEnd) { |
| 79 _closeFile(); |
| 80 return; |
| 81 } |
| 85 _readInProgress = true; | 82 _readInProgress = true; |
| 86 int readBytes = _BLOCK_SIZE; | 83 int readBytes = _BLOCK_SIZE; |
| 87 if (_end != null) { | 84 if (_end != null) { |
| 88 readBytes = min(readBytes, _end - _position); | 85 readBytes = min(readBytes, _end - _position); |
| 89 if (readBytes < 0) { | 86 if (readBytes < 0) { |
| 90 _readInProgress = false; | 87 _readInProgress = false; |
| 91 if (!_unsubscribed) { | 88 if (!_unsubscribed) { |
| 92 _controller.addError(new RangeError("Bad end position: $_end")); | 89 _controller.addError(new RangeError("Bad end position: $_end")); |
| 93 _closeFile(); | 90 _closeFile(); |
| 94 _unsubscribed = true; | 91 _unsubscribed = true; |
| 95 } | 92 } |
| 96 return; | 93 return; |
| 97 } | 94 } |
| 98 } | 95 } |
| 99 _openedFile.read(readBytes) | 96 _openedFile.read(readBytes) |
| 100 .whenComplete(() { | |
| 101 _readInProgress = false; | |
| 102 }) | |
| 103 .then((block) { | 97 .then((block) { |
| 104 if (_unsubscribed) { | 98 if (_unsubscribed) { |
| 105 _closeFile(); | 99 _closeFile(); |
| 106 return; | 100 return; |
| 107 } | 101 } |
| 108 if (block.length == 0) { | 102 _readInProgress = false; |
| 109 if (!_unsubscribed) { | 103 _position += block.length; |
| 110 _closeFile(); | 104 if (block.length < readBytes || |
| 111 _unsubscribed = true; | 105 (_end != null && _position == _end)) { |
| 112 } | 106 _atEnd = true; |
| 113 return; | |
| 114 } | 107 } |
| 115 _position += block.length; | 108 _controller.add(block); |
| 116 if (_paused) { | 109 if (!_controller.isPaused) { |
| 117 _currentBlock = block; | |
| 118 } else { | |
| 119 _controller.add(block); | |
| 120 _readBlock(); | 110 _readBlock(); |
| 121 } | 111 } |
| 122 }) | 112 }) |
| 123 .catchError((e) { | 113 .catchError((e, s) { |
| 124 if (!_unsubscribed) { | 114 if (!_unsubscribed) { |
| 125 _controller.addError(e); | 115 _controller.addError(e, s); |
| 126 _closeFile(); | 116 _closeFile(); |
| 127 _unsubscribed = true; | 117 _unsubscribed = true; |
| 128 } | 118 } |
| 129 }); | 119 }); |
| 130 } | 120 } |
| 131 | 121 |
| 132 void _start() { | 122 void _start() { |
| 133 if (_position == null) { | 123 if (_position < 0) { |
| 134 _position = 0; | |
| 135 } else if (_position < 0) { | |
| 136 _controller.addError(new RangeError("Bad start position: $_position")); | 124 _controller.addError(new RangeError("Bad start position: $_position")); |
| 137 _controller.close(); | 125 _controller.close(); |
| 138 return; | 126 return; |
| 139 } | 127 } |
| 140 Future<RandomAccessFile> openFuture; | 128 |
| 129 void onReady(RandomAccessFile file) { |
| 130 _openedFile = file; |
| 131 _readInProgress = false; |
| 132 _readBlock(); |
| 133 } |
| 134 |
| 135 void onOpenFile(RandomAccessFile file) { |
| 136 if (_position > 0) { |
| 137 file.setPosition(_position) |
| 138 .then(onReady, onError: (e, s) { |
| 139 _controller.addError(e, s); |
| 140 _readInProgress = false; |
| 141 _closeFile(); |
| 142 }); |
| 143 } else { |
| 144 onReady(file); |
| 145 } |
| 146 } |
| 147 |
| 148 void openFailed(error, stackTrace) { |
| 149 _controller.addError(error, stackTrace); |
| 150 _controller.close(); |
| 151 _closeCompleter.complete(); |
| 152 } |
| 153 |
| 141 if (_path != null) { | 154 if (_path != null) { |
| 142 openFuture = new File(_path).open(mode: FileMode.READ); | 155 new File(_path).open(mode: FileMode.READ) |
| 156 .then(onOpenFile, onError: openFailed); |
| 143 } else { | 157 } else { |
| 144 openFuture = new Future.value(_File._openStdioSync(0)); | 158 try { |
| 159 onOpenFile(_File._openStdioSync(0)); |
| 160 } catch (e, s) { |
| 161 openFailed(e, s); |
| 162 } |
| 145 } | 163 } |
| 146 _readInProgress = true; | |
| 147 openFuture | |
| 148 .then((RandomAccessFile opened) { | |
| 149 _openedFile = opened; | |
| 150 if (_position > 0) { | |
| 151 return opened.setPosition(_position); | |
| 152 } | |
| 153 }) | |
| 154 .whenComplete(() { | |
| 155 _readInProgress = false; | |
| 156 }) | |
| 157 .then((_) => _readBlock()) | |
| 158 .catchError((e) { | |
| 159 _controller.addError(e); | |
| 160 _closeFile(); | |
| 161 }); | |
| 162 } | |
| 163 | |
| 164 void _resume() { | |
| 165 _paused = false; | |
| 166 if (_currentBlock != null) { | |
| 167 _controller.add(_currentBlock); | |
| 168 _currentBlock = null; | |
| 169 } | |
| 170 // Resume reading unless we are already done. | |
| 171 if (_openedFile != null) _readBlock(); | |
| 172 } | 164 } |
| 173 } | 165 } |
| 174 | 166 |
| 175 class _FileStreamConsumer extends StreamConsumer<List<int>> { | 167 class _FileStreamConsumer extends StreamConsumer<List<int>> { |
| 176 File _file; | 168 File _file; |
| 177 Future<RandomAccessFile> _openFuture; | 169 Future<RandomAccessFile> _openFuture; |
| 178 | 170 |
| 179 _FileStreamConsumer(File this._file, FileMode mode) { | 171 _FileStreamConsumer(File this._file, FileMode mode) { |
| 180 _openFuture = _file.open(mode: mode); | 172 _openFuture = _file.open(mode: mode); |
| 181 } | 173 } |
| (...skipping 738 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 920 void _checkAvailable() { | 912 void _checkAvailable() { |
| 921 if (_asyncDispatched) { | 913 if (_asyncDispatched) { |
| 922 throw new FileSystemException("An async operation is currently pending", | 914 throw new FileSystemException("An async operation is currently pending", |
| 923 path); | 915 path); |
| 924 } | 916 } |
| 925 if (closed) { | 917 if (closed) { |
| 926 throw new FileSystemException("File closed", path); | 918 throw new FileSystemException("File closed", path); |
| 927 } | 919 } |
| 928 } | 920 } |
| 929 } | 921 } |
| OLD | NEW |