Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: sdk/lib/io/file_impl.dart

Issue 13680002: StreamConsumer has an addStream and a close functions. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Update comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Read the file in blocks of size 64k. 7 // Read the file in blocks of size 64k.
8 const int _BLOCK_SIZE = 64 * 1024; 8 const int _BLOCK_SIZE = 64 * 1024;
9 9
10 10
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
144 } else { 144 } else {
145 _resume(); 145 _resume();
146 } 146 }
147 } 147 }
148 } 148 }
149 149
150 class _FileStreamConsumer extends StreamConsumer<List<int>, File> { 150 class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
151 File _file; 151 File _file;
152 Future<RandomAccessFile> _openFuture; 152 Future<RandomAccessFile> _openFuture;
153 StreamSubscription _subscription; 153 StreamSubscription _subscription;
154
154 155
155 _FileStreamConsumer(File this._file, FileMode mode) { 156 _FileStreamConsumer(File this._file, FileMode mode) {
156 _openFuture = _file.open(mode: mode); 157 _openFuture = _file.open(mode: mode);
157 } 158 }
158 159
159 _FileStreamConsumer.fromStdio(int fd) { 160 _FileStreamConsumer.fromStdio(int fd) {
160 assert(1 <= fd && fd <= 2); 161 assert(1 <= fd && fd <= 2);
161 _openFuture = new Future.immediate(_File._openStdioSync(fd)); 162 _openFuture = new Future.immediate(_File._openStdioSync(fd));
162 } 163 }
163 164
164 Future<File> consume(Stream<List<int>> stream) { 165 Future<File> consume(Stream<List<int>> stream) {
166 return addStream(stream).then((_) => close());
167 }
168
169 Future<File> addStream(Stream<List<int>> stream) {
165 Completer<File> completer = new Completer<File>(); 170 Completer<File> completer = new Completer<File>();
166 _openFuture 171 _openFuture
167 .then((openedFile) { 172 .then((openedFile) {
168 _subscription = stream.listen( 173 _subscription = stream.listen(
169 (d) { 174 (d) {
170 _subscription.pause(); 175 _subscription.pause();
171 openedFile.writeList(d, 0, d.length) 176 openedFile.writeList(d, 0, d.length)
172 .then((_) => _subscription.resume()) 177 .then((_) => _subscription.resume())
173 .catchError((e) { 178 .catchError((e) {
174 openedFile.close(); 179 openedFile.close();
175 completer.completeError(e); 180 completer.completeError(e);
176 }); 181 });
177 }, 182 },
178 onDone: () { 183 onDone: () {
179 // Wait for the file to close (and therefore flush) before 184 completer.complete(_file);
180 // completing the future.
181 openedFile.close()
182 .then((_) {
183 completer.complete(_file);
184 })
185 .catchError((e) {
186 completer.completeError(e);
187 });
188 }, 185 },
189 onError: (e) { 186 onError: (e) {
190 openedFile.close(); 187 openedFile.close();
191 completer.completeError(e); 188 completer.completeError(e);
192 }, 189 },
193 unsubscribeOnError: true); 190 unsubscribeOnError: true);
194 }) 191 })
195 .catchError((e) { 192 .catchError((e) {
196 completer.completeError(e); 193 completer.completeError(e);
197 }); 194 });
198 return completer.future; 195 return completer.future;
199 } 196 }
197
198 Future<File> close() {
199 return _openFuture.then((openedFile) => openedFile.close());
200 }
200 } 201 }
201 202
202 203
203 const int _EXISTS_REQUEST = 0; 204 const int _EXISTS_REQUEST = 0;
204 const int _CREATE_REQUEST = 1; 205 const int _CREATE_REQUEST = 1;
205 const int _DELETE_REQUEST = 2; 206 const int _DELETE_REQUEST = 2;
206 const int _OPEN_REQUEST = 3; 207 const int _OPEN_REQUEST = 3;
207 const int _FULL_PATH_REQUEST = 4; 208 const int _FULL_PATH_REQUEST = 4;
208 const int _DIRECTORY_REQUEST = 5; 209 const int _DIRECTORY_REQUEST = 5;
209 const int _CLOSE_REQUEST = 6; 210 const int _CLOSE_REQUEST = 6;
(...skipping 840 matching lines...) Expand 10 before | Expand all | Expand 10 after
1050 new FileIOException("File closed '$_path'")); 1051 new FileIOException("File closed '$_path'"));
1051 }); 1052 });
1052 return completer.future; 1053 return completer.future;
1053 } 1054 }
1054 1055
1055 final String _path; 1056 final String _path;
1056 int _id; 1057 int _id;
1057 1058
1058 SendPort _fileService; 1059 SendPort _fileService;
1059 } 1060 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698