Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: sdk/lib/io/file_impl.dart

Issue 227733002: Don't call read, if last read returned less bytes than was being read. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Read the file in blocks of size 64k. 7 // Read the file in blocks of size 64k.
8 const int _BLOCK_SIZE = 64 * 1024; 8 const int _BLOCK_SIZE = 64 * 1024;
9 9
10 10
11 class _FileStream extends Stream<List<int>> { 11 class _FileStream extends Stream<List<int>> {
12 // Stream controller. 12 // Stream controller.
13 StreamController<List<int>> _controller; 13 StreamController<List<int>> _controller;
14 14
15 // Information about the underlying file. 15 // Information about the underlying file.
16 String _path; 16 String _path;
17 RandomAccessFile _openedFile; 17 RandomAccessFile _openedFile;
18 int _position; 18 int _position;
19 int _end; 19 int _end;
20 final Completer _closeCompleter = new Completer(); 20 final Completer _closeCompleter = new Completer();
21 21
22 // Has the stream been paused or unsubscribed? 22 // Has the stream been paused or unsubscribed?
23 bool _paused = false;
24 bool _unsubscribed = false; 23 bool _unsubscribed = false;
25 24
26 // Is there a read currently in progress? 25 // Is there a read currently in progress?
27 bool _readInProgress = false; 26 bool _readInProgress = true;
28 bool _closed = false; 27 bool _closed = false;
29 28
30 // Block read but not yet send because stream is paused. 29 bool _atEnd = false;
31 List<int> _currentBlock;
32 30
33 _FileStream(this._path, this._position, this._end) { 31 _FileStream(this._path, this._position, this._end) {
34 _setupController(); 32 if (_position == null) _position = 0;
35 } 33 }
36 34
37 _FileStream.forStdin() : _position = 0 { 35 _FileStream.forStdin() : _position = 0;
38 _setupController();
39 }
40 36
41 StreamSubscription<List<int>> listen(void onData(List<int> event), 37 StreamSubscription<List<int>> listen(void onData(List<int> event),
42 {Function onError, 38 {Function onError,
43 void onDone(), 39 void onDone(),
44 bool cancelOnError}) { 40 bool cancelOnError}) {
41 _setupController();
45 return _controller.stream.listen(onData, 42 return _controller.stream.listen(onData,
46 onError: onError, 43 onError: onError,
47 onDone: onDone, 44 onDone: onDone,
48 cancelOnError: cancelOnError); 45 cancelOnError: cancelOnError);
49 } 46 }
50 47
51 void _setupController() { 48 void _setupController() {
52 _controller = new StreamController<List<int>>(sync: true, 49 _controller = new StreamController<List<int>>(sync: true,
53 onListen: _start, 50 onListen: _start,
54 onPause: () => _paused = true, 51 onResume: _readBlock,
55 onResume: _resume,
56 onCancel: () { 52 onCancel: () {
57 _unsubscribed = true; 53 _unsubscribed = true;
58 return _closeFile(); 54 return _closeFile();
59 }); 55 });
60 } 56 }
61 57
62 Future _closeFile() { 58 Future _closeFile() {
63 if (_readInProgress || _closed) { 59 if (_readInProgress || _closed) {
64 return _closeCompleter.future; 60 return _closeCompleter.future;
65 } 61 }
66 _closed = true; 62 _closed = true;
63
67 void done() { 64 void done() {
68 _closeCompleter.complete(); 65 _closeCompleter.complete();
69 _controller.close(); 66 _controller.close();
70 } 67 }
71 if (_openedFile != null) { 68
72 _openedFile.close() 69 _openedFile.close()
73 .catchError(_controller.addError) 70 .catchError(_controller.addError)
74 .whenComplete(done); 71 .whenComplete(done);
75 _openedFile = null;
76 } else {
77 done();
78 }
79 return _closeCompleter.future; 72 return _closeCompleter.future;
80 } 73 }
81 74
82 void _readBlock() { 75 void _readBlock() {
83 // Don't start a new read if one is already in progress. 76 // Don't start a new read if one is already in progress.
84 if (_readInProgress) return; 77 if (_readInProgress) return;
78 if (_atEnd) {
79 _closeFile();
80 return;
81 }
85 _readInProgress = true; 82 _readInProgress = true;
86 int readBytes = _BLOCK_SIZE; 83 int readBytes = _BLOCK_SIZE;
87 if (_end != null) { 84 if (_end != null) {
88 readBytes = min(readBytes, _end - _position); 85 readBytes = min(readBytes, _end - _position);
89 if (readBytes < 0) { 86 if (readBytes < 0) {
90 _readInProgress = false; 87 _readInProgress = false;
91 if (!_unsubscribed) { 88 if (!_unsubscribed) {
92 _controller.addError(new RangeError("Bad end position: $_end")); 89 _controller.addError(new RangeError("Bad end position: $_end"));
93 _closeFile(); 90 _closeFile();
94 _unsubscribed = true; 91 _unsubscribed = true;
95 } 92 }
96 return; 93 return;
97 } 94 }
98 } 95 }
99 _openedFile.read(readBytes) 96 _openedFile.read(readBytes)
100 .whenComplete(() {
101 _readInProgress = false;
102 })
103 .then((block) { 97 .then((block) {
104 if (_unsubscribed) { 98 if (_unsubscribed) {
105 _closeFile(); 99 _closeFile();
106 return; 100 return;
107 } 101 }
108 if (block.length == 0) { 102 _readInProgress = false;
109 if (!_unsubscribed) { 103 _position += block.length;
110 _closeFile(); 104 if (block.length < readBytes ||
111 _unsubscribed = true; 105 (_end != null && _position == _end)) {
112 } 106 _atEnd = true;
113 return;
114 } 107 }
115 _position += block.length; 108 _controller.add(block);
116 if (_paused) { 109 if (!_controller.isPaused) {
117 _currentBlock = block;
118 } else {
119 _controller.add(block);
120 _readBlock(); 110 _readBlock();
121 } 111 }
122 }) 112 })
123 .catchError((e) { 113 .catchError((e, s) {
124 if (!_unsubscribed) { 114 if (!_unsubscribed) {
125 _controller.addError(e); 115 _controller.addError(e, s);
126 _closeFile(); 116 _closeFile();
127 _unsubscribed = true; 117 _unsubscribed = true;
128 } 118 }
129 }); 119 });
130 } 120 }
131 121
132 void _start() { 122 void _start() {
133 if (_position == null) { 123 if (_position < 0) {
134 _position = 0;
135 } else if (_position < 0) {
136 _controller.addError(new RangeError("Bad start position: $_position")); 124 _controller.addError(new RangeError("Bad start position: $_position"));
137 _controller.close(); 125 _controller.close();
138 return; 126 return;
139 } 127 }
140 Future<RandomAccessFile> openFuture; 128
129 void onReady(RandomAccessFile file) {
130 _openedFile = file;
131 _readInProgress = false;
132 _readBlock();
133 }
134
135 void onOpenFile(RandomAccessFile file) {
136 if (_position > 0) {
137 file.setPosition(_position)
138 .then(onReady, onError: (e, s) {
139 _controller.addError(e, s);
140 _readInProgress = false;
141 _closeFile();
142 });
143 } else {
144 onReady(file);
145 }
146 }
147
148 void openFailed(error, stackTrace) {
149 _controller.addError(error, stackTrace);
150 _controller.close();
151 _closeCompleter.complete();
152 }
153
141 if (_path != null) { 154 if (_path != null) {
142 openFuture = new File(_path).open(mode: FileMode.READ); 155 new File(_path).open(mode: FileMode.READ)
156 .then(onOpenFile, onError: openFailed);
143 } else { 157 } else {
144 openFuture = new Future.value(_File._openStdioSync(0)); 158 try {
159 onOpenFile(_File._openStdioSync(0));
160 } catch (e, s) {
161 openFailed(e, s);
162 }
145 } 163 }
146 _readInProgress = true;
147 openFuture
148 .then((RandomAccessFile opened) {
149 _openedFile = opened;
150 if (_position > 0) {
151 return opened.setPosition(_position);
152 }
153 })
154 .whenComplete(() {
155 _readInProgress = false;
156 })
157 .then((_) => _readBlock())
158 .catchError((e) {
159 _controller.addError(e);
160 _closeFile();
161 });
162 }
163
164 void _resume() {
165 _paused = false;
166 if (_currentBlock != null) {
167 _controller.add(_currentBlock);
168 _currentBlock = null;
169 }
170 // Resume reading unless we are already done.
171 if (_openedFile != null) _readBlock();
172 } 164 }
173 } 165 }
174 166
175 class _FileStreamConsumer extends StreamConsumer<List<int>> { 167 class _FileStreamConsumer extends StreamConsumer<List<int>> {
176 File _file; 168 File _file;
177 Future<RandomAccessFile> _openFuture; 169 Future<RandomAccessFile> _openFuture;
178 170
179 _FileStreamConsumer(File this._file, FileMode mode) { 171 _FileStreamConsumer(File this._file, FileMode mode) {
180 _openFuture = _file.open(mode: mode); 172 _openFuture = _file.open(mode: mode);
181 } 173 }
(...skipping 738 matching lines...) Expand 10 before | Expand all | Expand 10 after
920 void _checkAvailable() { 912 void _checkAvailable() {
921 if (_asyncDispatched) { 913 if (_asyncDispatched) {
922 throw new FileSystemException("An async operation is currently pending", 914 throw new FileSystemException("An async operation is currently pending",
923 path); 915 path);
924 } 916 }
925 if (closed) { 917 if (closed) {
926 throw new FileSystemException("File closed", path); 918 throw new FileSystemException("File closed", path);
927 } 919 }
928 } 920 }
929 } 921 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698