OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.io; | |
6 | |
7 /** | |
8 * Helper class to wrap a [StreamConsumer<List<int>, T>] and provide utility | |
9 * functions for writing to the StreamConsumer directly. The [IOSink] | |
10 * buffers the input given by [add] and [addString] and will delay a [consume] | |
11 * or [addStream] until the buffer is flushed. | |
12 * | |
13 * When the [IOSink] is bound to a stream (through either [consume] | |
14 * or [addStream]) any call to the [IOSink] will throw a | |
15 * [StateError]. | |
16 */ | |
17 class IOSink<T> implements StreamConsumer<List<int>, T> { | |
18 final StreamConsumer<List<int>, T> _target; | |
19 | |
20 StreamController<List<int>> _controllerInstance; | |
21 Future<T> _pipeFuture; | |
22 StreamSubscription<List<int>> _bindSubscription; | |
23 bool _paused = true; | |
24 | |
25 IOSink(StreamConsumer<List<int>, T> target) : _target = target; | |
26 | |
27 /** | |
28 * Provide functionality for piping to the [IOSink]. | |
29 */ | |
30 Future<T> consume(Stream<List<int>> stream) { | |
31 if (_isBound) { | |
32 throw new StateError("IOSink is already bound to a stream"); | |
33 } | |
34 return _fillFromStream(stream); | |
35 } | |
36 | |
37 /** | |
38 * Like [consume], but will not close the target when done. | |
39 */ | |
40 Future<T> addStream(Stream<List<int>> stream) { | |
41 if (_isBound) { | |
42 throw new StateError("IOSink is already bound to a stream"); | |
43 } | |
44 return _fillFromStream(stream, unbind: true); | |
45 } | |
46 | |
47 /** | |
48 * Write a list of bytes to the target. | |
49 */ | |
50 void add(List<int> data) { | |
51 if (_isBound) { | |
52 throw new StateError("IOSink is already bound to a stream"); | |
53 } | |
54 _controller.add(data); | |
55 } | |
56 | |
57 /** | |
58 * Write a String to the target. | |
59 */ | |
60 void addString(String string, [Encoding encoding = Encoding.UTF_8]) { | |
61 add(_encodeString(string, encoding)); | |
62 } | |
63 | |
64 /** | |
65 * Close the target. | |
66 */ | |
67 void close() { | |
68 if (_isBound) { | |
69 throw new StateError("IOSink is already bound to a stream"); | |
70 } | |
71 _controller.close(); | |
72 } | |
73 | |
74 /** | |
75 * Get future that will complete when all data has been written to | |
76 * the IOSink and it has been closed. | |
77 */ | |
78 Future<T> get done { | |
79 _controller; | |
80 return _pipeFuture.then((_) => this); | |
81 } | |
82 | |
83 StreamController<List<int>> get _controller { | |
84 if (_controllerInstance == null) { | |
85 _controllerInstance = new StreamController<List<int>>( | |
86 onPauseStateChange: _onPauseStateChange, | |
87 onSubscriptionStateChange: _onSubscriptionStateChange); | |
88 _pipeFuture = _controller.stream.pipe(_target).then((_) => this); | |
89 } | |
90 return _controllerInstance; | |
91 } | |
92 | |
93 bool get _isBound => _bindSubscription != null; | |
94 | |
95 void _onPauseStateChange() { | |
96 _paused = _controller.isPaused; | |
97 if (_controller.isPaused) { | |
98 _pause(); | |
99 } else { | |
100 _resume(); | |
101 } | |
102 } | |
103 | |
104 void _pause() { | |
105 if (_bindSubscription != null) { | |
106 try { | |
107 // The subscription can be canceled at this point. | |
108 _bindSubscription.pause(); | |
109 } catch (e) { | |
110 } | |
111 } | |
112 } | |
113 | |
114 void _resume() { | |
115 if (_bindSubscription != null) { | |
116 try { | |
117 // The subscription can be canceled at this point. | |
118 _bindSubscription.resume(); | |
119 } catch (e) { | |
120 } | |
121 } | |
122 } | |
123 | |
124 void _onSubscriptionStateChange() { | |
125 if (_controller.hasSubscribers) { | |
126 _paused = false; | |
127 _resume(); | |
128 } else { | |
129 if (_bindSubscription != null) { | |
130 _bindSubscription.cancel(); | |
131 _bindSubscription = null; | |
132 } | |
133 } | |
134 } | |
135 | |
136 Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { | |
137 _controller; | |
138 Completer<T> unbindCompleter; | |
139 if (unbind) { | |
140 unbindCompleter = new Completer<T>(); | |
141 } | |
142 completeUnbind([error]) { | |
143 if (unbindCompleter == null) return; | |
144 var tmp = unbindCompleter; | |
145 unbindCompleter = null; | |
146 if (error == null) { | |
147 _bindSubscription = null; | |
148 tmp.complete(); | |
149 } else { | |
150 tmp.completeError(error); | |
151 } | |
152 } | |
153 _bindSubscription = stream.listen( | |
154 _controller.add, | |
155 onDone: () { | |
156 if (unbind) { | |
157 completeUnbind(); | |
158 } else { | |
159 _controller.close(); | |
160 } | |
161 }, | |
162 onError: _controller.signalError); | |
163 if (_paused) _pause(); | |
164 if (unbind) { | |
165 _pipeFuture | |
166 .then((_) => completeUnbind(), | |
167 onError: (error) => completeUnbind(error)); | |
168 return unbindCompleter.future; | |
169 } else { | |
170 return _pipeFuture.then((_) => this); | |
171 } | |
172 } | |
173 } | |
OLD | NEW |