OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Read the file in blocks of size 64k. | 7 // Read the file in blocks of size 64k. |
8 const int _BLOCK_SIZE = 64 * 1024; | 8 const int _BLOCK_SIZE = 64 * 1024; |
9 | 9 |
10 | 10 |
11 class _FileStream extends Stream<List<int>> { | 11 class _FileStream extends Stream<List<int>> { |
12 // Stream controller. | 12 // Stream controller. |
13 StreamController<List<int>> _controller; | 13 StreamController<List<int>> _controller; |
14 | 14 |
15 // Information about the underlying file. | 15 // Information about the underlying file. |
16 String _path; | 16 String _path; |
17 RandomAccessFile _openedFile; | 17 RandomAccessFile _openedFile; |
18 int _position; | 18 int _position; |
19 int _end; | 19 int _end; |
| 20 final Completer _closeCompleter = new Completer(); |
20 | 21 |
21 // Has the stream been paused or unsubscribed? | 22 // Has the stream been paused or unsubscribed? |
22 bool _paused = false; | 23 bool _paused = false; |
23 bool _unsubscribed = false; | 24 bool _unsubscribed = false; |
24 | 25 |
25 // Is there a read currently in progress? | 26 // Is there a read currently in progress? |
26 bool _readInProgress = false; | 27 bool _readInProgress = false; |
27 | 28 |
28 // Block read but not yet send because stream is paused. | 29 // Block read but not yet send because stream is paused. |
29 List<int> _currentBlock; | 30 List<int> _currentBlock; |
(...skipping 21 matching lines...) Expand all Loading... |
51 onListen: _start, | 52 onListen: _start, |
52 onPause: () => _paused = true, | 53 onPause: () => _paused = true, |
53 onResume: _resume, | 54 onResume: _resume, |
54 onCancel: () { | 55 onCancel: () { |
55 _unsubscribed = true; | 56 _unsubscribed = true; |
56 _closeFile(); | 57 _closeFile(); |
57 }); | 58 }); |
58 } | 59 } |
59 | 60 |
60 Future _closeFile() { | 61 Future _closeFile() { |
61 Future closeFuture; | 62 if (_readInProgress) { |
| 63 return _closeCompleter.future; |
| 64 } |
62 if (_openedFile != null) { | 65 if (_openedFile != null) { |
63 Future closeFuture = _openedFile.close(); | 66 _openedFile.close() |
| 67 .then(_closeCompleter.complete, |
| 68 onError: _closeCompleter.completeError); |
64 _openedFile = null; | 69 _openedFile = null; |
65 return closeFuture; | |
66 } else { | |
67 return new Future.value(); | |
68 } | 70 } |
| 71 return _closeCompleter.future; |
69 } | 72 } |
70 | 73 |
71 void _readBlock() { | 74 void _readBlock() { |
72 // Don't start a new read if one is already in progress. | 75 // Don't start a new read if one is already in progress. |
73 if (_readInProgress) return; | 76 if (_readInProgress) return; |
74 _readInProgress = true; | 77 _readInProgress = true; |
75 int readBytes = _BLOCK_SIZE; | 78 int readBytes = _BLOCK_SIZE; |
76 if (_end != null) { | 79 if (_end != null) { |
77 readBytes = min(readBytes, _end - _position); | 80 readBytes = min(readBytes, _end - _position); |
78 if (readBytes < 0) { | 81 if (readBytes < 0) { |
| 82 _readInProgress = false; |
79 if (!_unsubscribed) { | 83 if (!_unsubscribed) { |
80 _controller.addError(new RangeError("Bad end position: $_end")); | 84 _controller.addError(new RangeError("Bad end position: $_end")); |
81 _closeFile().then((_) { _controller.close(); }); | 85 _closeFile().then((_) { _controller.close(); }); |
82 _unsubscribed = true; | 86 _unsubscribed = true; |
83 } | 87 } |
84 return; | 88 return; |
85 } | 89 } |
86 } | 90 } |
87 _openedFile.read(readBytes) | 91 _openedFile.read(readBytes) |
| 92 .whenComplete(() { |
| 93 _readInProgress = false; |
| 94 }) |
88 .then((block) { | 95 .then((block) { |
89 _readInProgress = false; | 96 if (_unsubscribed) { |
| 97 _closeFile(); |
| 98 return; |
| 99 } |
90 if (block.length == 0) { | 100 if (block.length == 0) { |
91 if (!_unsubscribed) { | 101 if (!_unsubscribed) { |
92 _closeFile().then((_) { _controller.close(); }); | 102 _closeFile().then((_) { _controller.close(); }); |
93 _unsubscribed = true; | 103 _unsubscribed = true; |
94 } | 104 } |
95 return; | 105 return; |
96 } | 106 } |
97 _position += block.length; | 107 _position += block.length; |
98 if (_paused) { | 108 if (_paused) { |
99 _currentBlock = block; | 109 _currentBlock = block; |
(...skipping 756 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
856 void _checkNotClosed() { | 866 void _checkNotClosed() { |
857 if (closed) { | 867 if (closed) { |
858 throw new FileException("File closed", path); | 868 throw new FileException("File closed", path); |
859 } | 869 } |
860 } | 870 } |
861 | 871 |
862 Future _closedException() { | 872 Future _closedException() { |
863 return new Future.error(new FileException("File closed", path)); | 873 return new Future.error(new FileException("File closed", path)); |
864 } | 874 } |
865 } | 875 } |
OLD | NEW |