| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" | |
| 6 // implementation isn't used directly to support non-"dart:io" applications. | |
| 7 // | |
| 8 // Because it's copied directly, only modifications necessary to support the | |
| 9 // desired public API and to remove "dart:io" dependencies have been made. | |
| 10 // | |
| 11 // This is up-to-date as of sdk revision | |
| 12 // 86227840d75d974feb238f8b3c59c038b99c05cf. | |
| 13 import 'dart:async'; | |
| 14 | |
| 15 class StreamSinkImpl<T> implements StreamSink<T> { | |
| 16 final StreamConsumer<T> _target; | |
| 17 Completer _doneCompleter = new Completer(); | |
| 18 Future _doneFuture; | |
| 19 StreamController<T> _controllerInstance; | |
| 20 Completer _controllerCompleter; | |
| 21 bool _isClosed = false; | |
| 22 bool _isBound = false; | |
| 23 bool _hasError = false; | |
| 24 | |
| 25 StreamSinkImpl(this._target) { | |
| 26 _doneFuture = _doneCompleter.future; | |
| 27 } | |
| 28 | |
| 29 void add(T data) { | |
| 30 if (_isClosed) return; | |
| 31 _controller.add(data); | |
| 32 } | |
| 33 | |
| 34 void addError(error, [StackTrace stackTrace]) { | |
| 35 _controller.addError(error, stackTrace); | |
| 36 } | |
| 37 | |
| 38 Future addStream(Stream<T> stream) { | |
| 39 if (_isBound) { | |
| 40 throw new StateError("StreamSink is already bound to a stream"); | |
| 41 } | |
| 42 _isBound = true; | |
| 43 if (_hasError) return done; | |
| 44 // Wait for any sync operations to complete. | |
| 45 Future targetAddStream() { | |
| 46 return _target.addStream(stream) | |
| 47 .whenComplete(() { | |
| 48 _isBound = false; | |
| 49 }); | |
| 50 } | |
| 51 if (_controllerInstance == null) return targetAddStream(); | |
| 52 var future = _controllerCompleter.future; | |
| 53 _controllerInstance.close(); | |
| 54 return future.then((_) => targetAddStream()); | |
| 55 } | |
| 56 | |
| 57 Future flush() { | |
| 58 if (_isBound) { | |
| 59 throw new StateError("StreamSink is bound to a stream"); | |
| 60 } | |
| 61 if (_controllerInstance == null) return new Future.value(this); | |
| 62 // Adding an empty stream-controller will return a future that will complete | |
| 63 // when all data is done. | |
| 64 _isBound = true; | |
| 65 var future = _controllerCompleter.future; | |
| 66 _controllerInstance.close(); | |
| 67 return future.whenComplete(() { | |
| 68 _isBound = false; | |
| 69 }); | |
| 70 } | |
| 71 | |
| 72 Future close() { | |
| 73 if (_isBound) { | |
| 74 throw new StateError("StreamSink is bound to a stream"); | |
| 75 } | |
| 76 if (!_isClosed) { | |
| 77 _isClosed = true; | |
| 78 if (_controllerInstance != null) { | |
| 79 _controllerInstance.close(); | |
| 80 } else { | |
| 81 _closeTarget(); | |
| 82 } | |
| 83 } | |
| 84 return done; | |
| 85 } | |
| 86 | |
| 87 void _closeTarget() { | |
| 88 _target.close().then(_completeDoneValue, onError: _completeDoneError); | |
| 89 } | |
| 90 | |
| 91 Future get done => _doneFuture; | |
| 92 | |
| 93 void _completeDoneValue(value) { | |
| 94 if (_doneCompleter == null) return; | |
| 95 _doneCompleter.complete(value); | |
| 96 _doneCompleter = null; | |
| 97 } | |
| 98 | |
| 99 void _completeDoneError(error, StackTrace stackTrace) { | |
| 100 if (_doneCompleter == null) return; | |
| 101 _hasError = true; | |
| 102 _doneCompleter.completeError(error, stackTrace); | |
| 103 _doneCompleter = null; | |
| 104 } | |
| 105 | |
| 106 StreamController<T> get _controller { | |
| 107 if (_isBound) { | |
| 108 throw new StateError("StreamSink is bound to a stream"); | |
| 109 } | |
| 110 if (_isClosed) { | |
| 111 throw new StateError("StreamSink is closed"); | |
| 112 } | |
| 113 if (_controllerInstance == null) { | |
| 114 _controllerInstance = new StreamController<T>(sync: true); | |
| 115 _controllerCompleter = new Completer(); | |
| 116 _target.addStream(_controller.stream) | |
| 117 .then( | |
| 118 (_) { | |
| 119 if (_isBound) { | |
| 120 // A new stream takes over - forward values to that stream. | |
| 121 _controllerCompleter.complete(this); | |
| 122 _controllerCompleter = null; | |
| 123 _controllerInstance = null; | |
| 124 } else { | |
| 125 // No new stream, .close was called. Close _target. | |
| 126 _closeTarget(); | |
| 127 } | |
| 128 }, | |
| 129 onError: (error, stackTrace) { | |
| 130 if (_isBound) { | |
| 131 // A new stream takes over - forward errors to that stream. | |
| 132 _controllerCompleter.completeError(error, stackTrace); | |
| 133 _controllerCompleter = null; | |
| 134 _controllerInstance = null; | |
| 135 } else { | |
| 136 // No new stream. No need to close target, as it have already | |
| 137 // failed. | |
| 138 _completeDoneError(error, stackTrace); | |
| 139 } | |
| 140 }); | |
| 141 } | |
| 142 return _controllerInstance; | |
| 143 } | |
| 144 } | |
| 145 | |
| OLD | NEW |