| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.io; | |
| 6 | |
| 7 /** | |
| 8 * Helper class to wrap a [StreamConsumer<List<int>, T>] and provide utility | |
| 9 * functions for writing to the StreamConsumer directly. The [IOSink] | |
| 10 * buffers the input given by [add] and [addString] and will delay a [consume] | |
| 11 * or [addStream] until the buffer is flushed. | |
| 12 * | |
| 13 * When the [IOSink] is bound to a stream (through either [consume] | |
| 14 * or [addStream]) any call to the [IOSink] will throw a | |
| 15 * [StateError]. | |
| 16 */ | |
| 17 class IOSink<T> implements StreamConsumer<List<int>, T> { | |
| 18 final StreamConsumer<List<int>, T> _target; | |
| 19 | |
| 20 StreamController<List<int>> _controllerInstance; | |
| 21 Future<T> _pipeFuture; | |
| 22 StreamSubscription<List<int>> _bindSubscription; | |
| 23 bool _paused = true; | |
| 24 | |
| 25 IOSink(StreamConsumer<List<int>, T> target) : _target = target; | |
| 26 | |
| 27 /** | |
| 28 * Provide functionality for piping to the [IOSink]. | |
| 29 */ | |
| 30 Future<T> consume(Stream<List<int>> stream) { | |
| 31 if (_isBound) { | |
| 32 throw new StateError("IOSink is already bound to a stream"); | |
| 33 } | |
| 34 return _fillFromStream(stream); | |
| 35 } | |
| 36 | |
| 37 /** | |
| 38 * Like [consume], but will not close the target when done. | |
| 39 */ | |
| 40 Future<T> addStream(Stream<List<int>> stream) { | |
| 41 if (_isBound) { | |
| 42 throw new StateError("IOSink is already bound to a stream"); | |
| 43 } | |
| 44 return _fillFromStream(stream, unbind: true); | |
| 45 } | |
| 46 | |
| 47 /** | |
| 48 * Write a list of bytes to the target. | |
| 49 */ | |
| 50 void add(List<int> data) { | |
| 51 if (_isBound) { | |
| 52 throw new StateError("IOSink is already bound to a stream"); | |
| 53 } | |
| 54 _controller.add(data); | |
| 55 } | |
| 56 | |
| 57 /** | |
| 58 * Write a String to the target. | |
| 59 */ | |
| 60 void addString(String string, [Encoding encoding = Encoding.UTF_8]) { | |
| 61 add(_encodeString(string, encoding)); | |
| 62 } | |
| 63 | |
| 64 /** | |
| 65 * Close the target. | |
| 66 */ | |
| 67 void close() { | |
| 68 if (_isBound) { | |
| 69 throw new StateError("IOSink is already bound to a stream"); | |
| 70 } | |
| 71 _controller.close(); | |
| 72 } | |
| 73 | |
| 74 /** | |
| 75 * Get future that will complete when all data has been written to | |
| 76 * the IOSink and it has been closed. | |
| 77 */ | |
| 78 Future<T> get done { | |
| 79 _controller; | |
| 80 return _pipeFuture.then((_) => this); | |
| 81 } | |
| 82 | |
| 83 StreamController<List<int>> get _controller { | |
| 84 if (_controllerInstance == null) { | |
| 85 _controllerInstance = new StreamController<List<int>>( | |
| 86 onPauseStateChange: _onPauseStateChange, | |
| 87 onSubscriptionStateChange: _onSubscriptionStateChange); | |
| 88 _pipeFuture = _controller.stream.pipe(_target).then((_) => this); | |
| 89 } | |
| 90 return _controllerInstance; | |
| 91 } | |
| 92 | |
| 93 bool get _isBound => _bindSubscription != null; | |
| 94 | |
| 95 void _onPauseStateChange() { | |
| 96 _paused = _controller.isPaused; | |
| 97 if (_controller.isPaused) { | |
| 98 _pause(); | |
| 99 } else { | |
| 100 _resume(); | |
| 101 } | |
| 102 } | |
| 103 | |
| 104 void _pause() { | |
| 105 if (_bindSubscription != null) { | |
| 106 try { | |
| 107 // The subscription can be canceled at this point. | |
| 108 _bindSubscription.pause(); | |
| 109 } catch (e) { | |
| 110 } | |
| 111 } | |
| 112 } | |
| 113 | |
| 114 void _resume() { | |
| 115 if (_bindSubscription != null) { | |
| 116 try { | |
| 117 // The subscription can be canceled at this point. | |
| 118 _bindSubscription.resume(); | |
| 119 } catch (e) { | |
| 120 } | |
| 121 } | |
| 122 } | |
| 123 | |
| 124 void _onSubscriptionStateChange() { | |
| 125 if (_controller.hasSubscribers) { | |
| 126 _paused = false; | |
| 127 _resume(); | |
| 128 } else { | |
| 129 if (_bindSubscription != null) { | |
| 130 _bindSubscription.cancel(); | |
| 131 _bindSubscription = null; | |
| 132 } | |
| 133 } | |
| 134 } | |
| 135 | |
| 136 Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { | |
| 137 _controller; | |
| 138 Completer<T> unbindCompleter; | |
| 139 if (unbind) { | |
| 140 unbindCompleter = new Completer<T>(); | |
| 141 } | |
| 142 completeUnbind([error]) { | |
| 143 if (unbindCompleter == null) return; | |
| 144 var tmp = unbindCompleter; | |
| 145 unbindCompleter = null; | |
| 146 if (error == null) { | |
| 147 _bindSubscription = null; | |
| 148 tmp.complete(); | |
| 149 } else { | |
| 150 tmp.completeError(error); | |
| 151 } | |
| 152 } | |
| 153 _bindSubscription = stream.listen( | |
| 154 _controller.add, | |
| 155 onDone: () { | |
| 156 if (unbind) { | |
| 157 completeUnbind(); | |
| 158 } else { | |
| 159 _controller.close(); | |
| 160 } | |
| 161 }, | |
| 162 onError: _controller.signalError); | |
| 163 if (_paused) _pause(); | |
| 164 if (unbind) { | |
| 165 _pipeFuture | |
| 166 .then((_) => completeUnbind(), | |
| 167 onError: (error) => completeUnbind(error)); | |
| 168 return unbindCompleter.future; | |
| 169 } else { | |
| 170 return _pipeFuture.then((_) => this); | |
| 171 } | |
| 172 } | |
| 173 } | |
| OLD | NEW |