OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" |
| 6 // implementation isn't used directly to support non-"dart:io" applications. |
| 7 // |
| 8 // Because it's copied directly, only modifications necessary to support the |
| 9 // desired public API and to remove "dart:io" dependencies have been made. |
| 10 // |
| 11 // This is up-to-date as of sdk revision |
| 12 // 86227840d75d974feb238f8b3c59c038b99c05cf. |
| 13 import 'dart:async'; |
| 14 |
| 15 class StreamSinkImpl<T> implements StreamSink<T> { |
| 16 final StreamConsumer<T> _target; |
| 17 Completer _doneCompleter = new Completer(); |
| 18 Future _doneFuture; |
| 19 StreamController<T> _controllerInstance; |
| 20 Completer _controllerCompleter; |
| 21 bool _isClosed = false; |
| 22 bool _isBound = false; |
| 23 bool _hasError = false; |
| 24 |
| 25 StreamSinkImpl(this._target) { |
| 26 _doneFuture = _doneCompleter.future; |
| 27 } |
| 28 |
| 29 void add(T data) { |
| 30 if (_isClosed) return; |
| 31 _controller.add(data); |
| 32 } |
| 33 |
| 34 void addError(error, [StackTrace stackTrace]) { |
| 35 _controller.addError(error, stackTrace); |
| 36 } |
| 37 |
| 38 Future addStream(Stream<T> stream) { |
| 39 if (_isBound) { |
| 40 throw new StateError("StreamSink is already bound to a stream"); |
| 41 } |
| 42 _isBound = true; |
| 43 if (_hasError) return done; |
| 44 // Wait for any sync operations to complete. |
| 45 Future targetAddStream() { |
| 46 return _target.addStream(stream) |
| 47 .whenComplete(() { |
| 48 _isBound = false; |
| 49 }); |
| 50 } |
| 51 if (_controllerInstance == null) return targetAddStream(); |
| 52 var future = _controllerCompleter.future; |
| 53 _controllerInstance.close(); |
| 54 return future.then((_) => targetAddStream()); |
| 55 } |
| 56 |
| 57 Future flush() { |
| 58 if (_isBound) { |
| 59 throw new StateError("StreamSink is bound to a stream"); |
| 60 } |
| 61 if (_controllerInstance == null) return new Future.value(this); |
| 62 // Adding an empty stream-controller will return a future that will complete |
| 63 // when all data is done. |
| 64 _isBound = true; |
| 65 var future = _controllerCompleter.future; |
| 66 _controllerInstance.close(); |
| 67 return future.whenComplete(() { |
| 68 _isBound = false; |
| 69 }); |
| 70 } |
| 71 |
| 72 Future close() { |
| 73 if (_isBound) { |
| 74 throw new StateError("StreamSink is bound to a stream"); |
| 75 } |
| 76 if (!_isClosed) { |
| 77 _isClosed = true; |
| 78 if (_controllerInstance != null) { |
| 79 _controllerInstance.close(); |
| 80 } else { |
| 81 _closeTarget(); |
| 82 } |
| 83 } |
| 84 return done; |
| 85 } |
| 86 |
| 87 void _closeTarget() { |
| 88 _target.close().then(_completeDoneValue, onError: _completeDoneError); |
| 89 } |
| 90 |
| 91 Future get done => _doneFuture; |
| 92 |
| 93 void _completeDoneValue(value) { |
| 94 if (_doneCompleter == null) return; |
| 95 _doneCompleter.complete(value); |
| 96 _doneCompleter = null; |
| 97 } |
| 98 |
| 99 void _completeDoneError(error, StackTrace stackTrace) { |
| 100 if (_doneCompleter == null) return; |
| 101 _hasError = true; |
| 102 _doneCompleter.completeError(error, stackTrace); |
| 103 _doneCompleter = null; |
| 104 } |
| 105 |
| 106 StreamController<T> get _controller { |
| 107 if (_isBound) { |
| 108 throw new StateError("StreamSink is bound to a stream"); |
| 109 } |
| 110 if (_isClosed) { |
| 111 throw new StateError("StreamSink is closed"); |
| 112 } |
| 113 if (_controllerInstance == null) { |
| 114 _controllerInstance = new StreamController<T>(sync: true); |
| 115 _controllerCompleter = new Completer(); |
| 116 _target.addStream(_controller.stream) |
| 117 .then( |
| 118 (_) { |
| 119 if (_isBound) { |
| 120 // A new stream takes over - forward values to that stream. |
| 121 _controllerCompleter.complete(this); |
| 122 _controllerCompleter = null; |
| 123 _controllerInstance = null; |
| 124 } else { |
| 125 // No new stream, .close was called. Close _target. |
| 126 _closeTarget(); |
| 127 } |
| 128 }, |
| 129 onError: (error, stackTrace) { |
| 130 if (_isBound) { |
| 131 // A new stream takes over - forward errors to that stream. |
| 132 _controllerCompleter.completeError(error, stackTrace); |
| 133 _controllerCompleter = null; |
| 134 _controllerInstance = null; |
| 135 } else { |
| 136 // No new stream. No need to close target, as it have already |
| 137 // failed. |
| 138 _completeDoneError(error, stackTrace); |
| 139 } |
| 140 }); |
| 141 } |
| 142 return _controllerInstance; |
| 143 } |
| 144 } |
| 145 |
OLD | NEW |