OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Read the file in blocks of size 64k. | 7 // Read the file in blocks of size 64k. |
8 const int _BLOCK_SIZE = 64 * 1024; | 8 const int _BLOCK_SIZE = 64 * 1024; |
9 | 9 |
10 | 10 |
11 class _FileStream extends Stream<List<int>> { | 11 class _FileStream extends Stream<List<int>> { |
12 // Stream controller. | 12 // Stream controller. |
13 StreamController<List<int>> _controller; | 13 StreamController<List<int>> _controller; |
14 | 14 |
15 // Information about the underlying file. | 15 // Information about the underlying file. |
16 String _path; | 16 String _path; |
17 RandomAccessFile _openedFile; | 17 RandomAccessFile _openedFile; |
18 int _position; | 18 int _position; |
19 int _end; | 19 int _end; |
20 final Completer _closeCompleter = new Completer(); | 20 final Completer _closeCompleter = new Completer(); |
21 | 21 |
22 // Has the stream been paused or unsubscribed? | 22 // Has the stream been paused or unsubscribed? |
23 bool _paused = false; | |
24 bool _unsubscribed = false; | 23 bool _unsubscribed = false; |
25 | 24 |
26 // Is there a read currently in progress? | 25 // Is there a read currently in progress? |
27 bool _readInProgress = false; | 26 bool _readInProgress = true; |
28 bool _closed = false; | 27 bool _closed = false; |
29 | 28 |
30 // Block read but not yet send because stream is paused. | 29 bool _atEnd = false; |
31 List<int> _currentBlock; | |
32 | 30 |
33 _FileStream(this._path, this._position, this._end) { | 31 _FileStream(this._path, this._position, this._end) { |
34 _setupController(); | 32 if (_position == null) _position = 0; |
35 } | 33 } |
36 | 34 |
37 _FileStream.forStdin() : _position = 0 { | 35 _FileStream.forStdin() : _position = 0; |
38 _setupController(); | |
39 } | |
40 | 36 |
41 StreamSubscription<List<int>> listen(void onData(List<int> event), | 37 StreamSubscription<List<int>> listen(void onData(List<int> event), |
42 {Function onError, | 38 {Function onError, |
43 void onDone(), | 39 void onDone(), |
44 bool cancelOnError}) { | 40 bool cancelOnError}) { |
| 41 _setupController(); |
45 return _controller.stream.listen(onData, | 42 return _controller.stream.listen(onData, |
46 onError: onError, | 43 onError: onError, |
47 onDone: onDone, | 44 onDone: onDone, |
48 cancelOnError: cancelOnError); | 45 cancelOnError: cancelOnError); |
49 } | 46 } |
50 | 47 |
51 void _setupController() { | 48 void _setupController() { |
52 _controller = new StreamController<List<int>>(sync: true, | 49 _controller = new StreamController<List<int>>(sync: true, |
53 onListen: _start, | 50 onListen: _start, |
54 onPause: () => _paused = true, | 51 onResume: _readBlock, |
55 onResume: _resume, | |
56 onCancel: () { | 52 onCancel: () { |
57 _unsubscribed = true; | 53 _unsubscribed = true; |
58 return _closeFile(); | 54 return _closeFile(); |
59 }); | 55 }); |
60 } | 56 } |
61 | 57 |
62 Future _closeFile() { | 58 Future _closeFile() { |
63 if (_readInProgress || _closed) { | 59 if (_readInProgress || _closed) { |
64 return _closeCompleter.future; | 60 return _closeCompleter.future; |
65 } | 61 } |
66 _closed = true; | 62 _closed = true; |
| 63 |
67 void done() { | 64 void done() { |
68 _closeCompleter.complete(); | 65 _closeCompleter.complete(); |
69 _controller.close(); | 66 _controller.close(); |
70 } | 67 } |
71 if (_openedFile != null) { | 68 |
72 _openedFile.close() | 69 _openedFile.close() |
73 .catchError(_controller.addError) | 70 .catchError(_controller.addError) |
74 .whenComplete(done); | 71 .whenComplete(done); |
75 _openedFile = null; | |
76 } else { | |
77 done(); | |
78 } | |
79 return _closeCompleter.future; | 72 return _closeCompleter.future; |
80 } | 73 } |
81 | 74 |
82 void _readBlock() { | 75 void _readBlock() { |
83 // Don't start a new read if one is already in progress. | 76 // Don't start a new read if one is already in progress. |
84 if (_readInProgress) return; | 77 if (_readInProgress) return; |
| 78 if (_atEnd) { |
| 79 _closeFile(); |
| 80 return; |
| 81 } |
85 _readInProgress = true; | 82 _readInProgress = true; |
86 int readBytes = _BLOCK_SIZE; | 83 int readBytes = _BLOCK_SIZE; |
87 if (_end != null) { | 84 if (_end != null) { |
88 readBytes = min(readBytes, _end - _position); | 85 readBytes = min(readBytes, _end - _position); |
89 if (readBytes < 0) { | 86 if (readBytes < 0) { |
90 _readInProgress = false; | 87 _readInProgress = false; |
91 if (!_unsubscribed) { | 88 if (!_unsubscribed) { |
92 _controller.addError(new RangeError("Bad end position: $_end")); | 89 _controller.addError(new RangeError("Bad end position: $_end")); |
93 _closeFile(); | 90 _closeFile(); |
94 _unsubscribed = true; | 91 _unsubscribed = true; |
95 } | 92 } |
96 return; | 93 return; |
97 } | 94 } |
98 } | 95 } |
99 _openedFile.read(readBytes) | 96 _openedFile.read(readBytes) |
100 .whenComplete(() { | |
101 _readInProgress = false; | |
102 }) | |
103 .then((block) { | 97 .then((block) { |
104 if (_unsubscribed) { | 98 if (_unsubscribed) { |
105 _closeFile(); | 99 _closeFile(); |
106 return; | 100 return; |
107 } | 101 } |
108 if (block.length == 0) { | 102 _readInProgress = false; |
109 if (!_unsubscribed) { | 103 _position += block.length; |
110 _closeFile(); | 104 if (block.length < readBytes || |
111 _unsubscribed = true; | 105 (_end != null && _position == _end)) { |
112 } | 106 _atEnd = true; |
113 return; | |
114 } | 107 } |
115 _position += block.length; | 108 _controller.add(block); |
116 if (_paused) { | 109 if (!_controller.isPaused) { |
117 _currentBlock = block; | |
118 } else { | |
119 _controller.add(block); | |
120 _readBlock(); | 110 _readBlock(); |
121 } | 111 } |
122 }) | 112 }) |
123 .catchError((e) { | 113 .catchError((e, s) { |
124 if (!_unsubscribed) { | 114 if (!_unsubscribed) { |
125 _controller.addError(e); | 115 _controller.addError(e, s); |
126 _closeFile(); | 116 _closeFile(); |
127 _unsubscribed = true; | 117 _unsubscribed = true; |
128 } | 118 } |
129 }); | 119 }); |
130 } | 120 } |
131 | 121 |
132 void _start() { | 122 void _start() { |
133 if (_position == null) { | 123 if (_position < 0) { |
134 _position = 0; | |
135 } else if (_position < 0) { | |
136 _controller.addError(new RangeError("Bad start position: $_position")); | 124 _controller.addError(new RangeError("Bad start position: $_position")); |
137 _controller.close(); | 125 _controller.close(); |
138 return; | 126 return; |
139 } | 127 } |
140 Future<RandomAccessFile> openFuture; | 128 |
| 129 void onReady(RandomAccessFile file) { |
| 130 _openedFile = file; |
| 131 _readInProgress = false; |
| 132 _readBlock(); |
| 133 } |
| 134 |
| 135 void onOpenFile(RandomAccessFile file) { |
| 136 if (_position > 0) { |
| 137 file.setPosition(_position) |
| 138 .then(onReady, onError: (e, s) { |
| 139 _controller.addError(e, s); |
| 140 _readInProgress = false; |
| 141 _closeFile(); |
| 142 }); |
| 143 } else { |
| 144 onReady(file); |
| 145 } |
| 146 } |
| 147 |
| 148 void openFailed(error, stackTrace) { |
| 149 _controller.addError(error, stackTrace); |
| 150 _controller.close(); |
| 151 _closeCompleter.complete(); |
| 152 } |
| 153 |
141 if (_path != null) { | 154 if (_path != null) { |
142 openFuture = new File(_path).open(mode: FileMode.READ); | 155 new File(_path).open(mode: FileMode.READ) |
| 156 .then(onOpenFile, onError: openFailed); |
143 } else { | 157 } else { |
144 openFuture = new Future.value(_File._openStdioSync(0)); | 158 try { |
| 159 onOpenFile(_File._openStdioSync(0)); |
| 160 } catch (e, s) { |
| 161 openFailed(e, s); |
| 162 } |
145 } | 163 } |
146 _readInProgress = true; | |
147 openFuture | |
148 .then((RandomAccessFile opened) { | |
149 _openedFile = opened; | |
150 if (_position > 0) { | |
151 return opened.setPosition(_position); | |
152 } | |
153 }) | |
154 .whenComplete(() { | |
155 _readInProgress = false; | |
156 }) | |
157 .then((_) => _readBlock()) | |
158 .catchError((e) { | |
159 _controller.addError(e); | |
160 _closeFile(); | |
161 }); | |
162 } | |
163 | |
164 void _resume() { | |
165 _paused = false; | |
166 if (_currentBlock != null) { | |
167 _controller.add(_currentBlock); | |
168 _currentBlock = null; | |
169 } | |
170 // Resume reading unless we are already done. | |
171 if (_openedFile != null) _readBlock(); | |
172 } | 164 } |
173 } | 165 } |
174 | 166 |
175 class _FileStreamConsumer extends StreamConsumer<List<int>> { | 167 class _FileStreamConsumer extends StreamConsumer<List<int>> { |
176 File _file; | 168 File _file; |
177 Future<RandomAccessFile> _openFuture; | 169 Future<RandomAccessFile> _openFuture; |
178 | 170 |
179 _FileStreamConsumer(File this._file, FileMode mode) { | 171 _FileStreamConsumer(File this._file, FileMode mode) { |
180 _openFuture = _file.open(mode: mode); | 172 _openFuture = _file.open(mode: mode); |
181 } | 173 } |
(...skipping 738 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
920 void _checkAvailable() { | 912 void _checkAvailable() { |
921 if (_asyncDispatched) { | 913 if (_asyncDispatched) { |
922 throw new FileSystemException("An async operation is currently pending", | 914 throw new FileSystemException("An async operation is currently pending", |
923 path); | 915 path); |
924 } | 916 } |
925 if (closed) { | 917 if (closed) { |
926 throw new FileSystemException("File closed", path); | 918 throw new FileSystemException("File closed", path); |
927 } | 919 } |
928 } | 920 } |
929 } | 921 } |
OLD | NEW |