OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" |
| 6 // implementation isn't used directly to support non-"dart:io" applications. |
| 7 // |
| 8 // Because it's copied directly, only modifications necessary to support the |
| 9 // desired public API and to remove "dart:io" dependencies have been made. |
| 10 // |
| 11 // This is up-to-date as of sdk revision |
| 12 // 86227840d75d974feb238f8b3c59c038b99c05cf. |
| 13 library http_parser.copy.io_sink; |
| 14 |
| 15 import 'dart:async'; |
| 16 |
| 17 class StreamSinkImpl<T> implements StreamSink<T> { |
| 18 final StreamConsumer<T> _target; |
| 19 Completer _doneCompleter = new Completer(); |
| 20 Future _doneFuture; |
| 21 StreamController<T> _controllerInstance; |
| 22 Completer _controllerCompleter; |
| 23 bool _isClosed = false; |
| 24 bool _isBound = false; |
| 25 bool _hasError = false; |
| 26 |
| 27 StreamSinkImpl(this._target) { |
| 28 _doneFuture = _doneCompleter.future; |
| 29 } |
| 30 |
| 31 void add(T data) { |
| 32 if (_isClosed) return; |
| 33 _controller.add(data); |
| 34 } |
| 35 |
| 36 void addError(error, [StackTrace stackTrace]) { |
| 37 _controller.addError(error, stackTrace); |
| 38 } |
| 39 |
| 40 Future addStream(Stream<T> stream) { |
| 41 if (_isBound) { |
| 42 throw new StateError("StreamSink is already bound to a stream"); |
| 43 } |
| 44 _isBound = true; |
| 45 if (_hasError) return done; |
| 46 // Wait for any sync operations to complete. |
| 47 Future targetAddStream() { |
| 48 return _target.addStream(stream) |
| 49 .whenComplete(() { |
| 50 _isBound = false; |
| 51 }); |
| 52 } |
| 53 if (_controllerInstance == null) return targetAddStream(); |
| 54 var future = _controllerCompleter.future; |
| 55 _controllerInstance.close(); |
| 56 return future.then((_) => targetAddStream()); |
| 57 } |
| 58 |
| 59 Future flush() { |
| 60 if (_isBound) { |
| 61 throw new StateError("StreamSink is bound to a stream"); |
| 62 } |
| 63 if (_controllerInstance == null) return new Future.value(this); |
| 64 // Adding an empty stream-controller will return a future that will complete |
| 65 // when all data is done. |
| 66 _isBound = true; |
| 67 var future = _controllerCompleter.future; |
| 68 _controllerInstance.close(); |
| 69 return future.whenComplete(() { |
| 70 _isBound = false; |
| 71 }); |
| 72 } |
| 73 |
| 74 Future close() { |
| 75 if (_isBound) { |
| 76 throw new StateError("StreamSink is bound to a stream"); |
| 77 } |
| 78 if (!_isClosed) { |
| 79 _isClosed = true; |
| 80 if (_controllerInstance != null) { |
| 81 _controllerInstance.close(); |
| 82 } else { |
| 83 _closeTarget(); |
| 84 } |
| 85 } |
| 86 return done; |
| 87 } |
| 88 |
| 89 void _closeTarget() { |
| 90 _target.close().then(_completeDoneValue, onError: _completeDoneError); |
| 91 } |
| 92 |
| 93 Future get done => _doneFuture; |
| 94 |
| 95 void _completeDoneValue(value) { |
| 96 if (_doneCompleter == null) return; |
| 97 _doneCompleter.complete(value); |
| 98 _doneCompleter = null; |
| 99 } |
| 100 |
| 101 void _completeDoneError(error, StackTrace stackTrace) { |
| 102 if (_doneCompleter == null) return; |
| 103 _hasError = true; |
| 104 _doneCompleter.completeError(error, stackTrace); |
| 105 _doneCompleter = null; |
| 106 } |
| 107 |
| 108 StreamController<T> get _controller { |
| 109 if (_isBound) { |
| 110 throw new StateError("StreamSink is bound to a stream"); |
| 111 } |
| 112 if (_isClosed) { |
| 113 throw new StateError("StreamSink is closed"); |
| 114 } |
| 115 if (_controllerInstance == null) { |
| 116 _controllerInstance = new StreamController<T>(sync: true); |
| 117 _controllerCompleter = new Completer(); |
| 118 _target.addStream(_controller.stream) |
| 119 .then( |
| 120 (_) { |
| 121 if (_isBound) { |
| 122 // A new stream takes over - forward values to that stream. |
| 123 _controllerCompleter.complete(this); |
| 124 _controllerCompleter = null; |
| 125 _controllerInstance = null; |
| 126 } else { |
| 127 // No new stream, .close was called. Close _target. |
| 128 _closeTarget(); |
| 129 } |
| 130 }, |
| 131 onError: (error, stackTrace) { |
| 132 if (_isBound) { |
| 133 // A new stream takes over - forward errors to that stream. |
| 134 _controllerCompleter.completeError(error, stackTrace); |
| 135 _controllerCompleter = null; |
| 136 _controllerInstance = null; |
| 137 } else { |
| 138 // No new stream. No need to close target, as it have already |
| 139 // failed. |
| 140 _completeDoneError(error, stackTrace); |
| 141 } |
| 142 }); |
| 143 } |
| 144 return _controllerInstance; |
| 145 } |
| 146 } |
| 147 |
OLD | NEW |