| Index: lib/src/guarantee_channel.dart
|
| diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..849cc2ffe74edc22e0ff79afe65749e0f5636af3
|
| --- /dev/null
|
| +++ b/lib/src/guarantee_channel.dart
|
| @@ -0,0 +1,163 @@
|
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'package:async/async.dart';
|
| +
|
| +import '../stream_channel.dart';
|
| +
|
| +/// A [StreamChannel] that enforces the stream channel guarantees.
|
| +///
|
| +/// This is exposed via [new StreamChannel.withGuarantees].
|
| +class GuaranteeChannel<T> extends StreamChannelMixin<T> {
|
| + Stream<T> get stream => _streamController.stream;
|
| +
|
| + StreamSink<T> get sink => _sink;
|
| + _GuaranteeSink<T> _sink;
|
| +
|
| + /// The controller for [stream].
|
| + ///
|
| + /// This intermediate controller allows us to continue listening for a done
|
| + /// event even after the user has canceled their subscription, and to send our
|
| + /// own done event when the sink is closed.
|
| + StreamController<T> _streamController;
|
| +
|
| + /// The subscription to the inner stream.
|
| + StreamSubscription<T> _subscription;
|
| +
|
| + /// Whether the sink has closed, causing the underlying channel to disconnect.
|
| + bool _disconnected = false;
|
| +
|
| + GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
|
| + _sink = new _GuaranteeSink<T>(innerSink, this);
|
| +
|
| + // Enforce the single-subscription guarantee by changing a broadcast stream
|
| + // to single-subscription.
|
| + if (innerStream.isBroadcast) {
|
| + innerStream = innerStream.transform(
|
| + const SingleSubscriptionTransformer());
|
| + }
|
| +
|
| + _streamController = new StreamController<T>(onListen: () {
|
| + // If the sink has disconnected, we've already called
|
| + // [_streamController.close].
|
| + if (_disconnected) return;
|
| +
|
| + _subscription = innerStream.listen(_streamController.add,
|
| + onError: _streamController.addError,
|
| + onDone: () {
|
| + _sink._onStreamDisconnected();
|
| + _streamController.close();
|
| + });
|
| + }, sync: true);
|
| + }
|
| +
|
| + /// Called by [_GuaranteeSink] when the user closes it.
|
| + ///
|
| + /// The sink closing indicates that the connection is closed, so the stream
|
| + /// should stop emitting events.
|
| + void _onSinkDisconnected() {
|
| + _disconnected = true;
|
| + if (_subscription != null) _subscription.cancel();
|
| + _streamController.close();
|
| + }
|
| +}
|
| +
|
| +/// The sink for [GuaranteeChannel].
|
| +///
|
| +/// This wraps the inner sink to ignore events and cancel any in-progress
|
| +/// [addStream] calls when the underlying channel closes.
|
| +class _GuaranteeSink<T> implements StreamSink<T> {
|
| + /// The inner sink being wrapped.
|
| + final StreamSink<T> _inner;
|
| +
|
| + /// The [GuaranteeChannel] this belongs to.
|
| + final GuaranteeChannel<T> _channel;
|
| +
|
| + Future get done => _inner.done;
|
| +
|
| + /// Whether the stream has emitted a done event, causing the underlying
|
| + /// channel to disconnect.
|
| + bool _disconnected = false;
|
| +
|
| + /// Whether the user has called [close].
|
| + bool _closed = false;
|
| +
|
| + /// The subscription to the stream passed to [addStream], if a stream is
|
| + /// currently being added.
|
| + StreamSubscription<T> _addStreamSubscription;
|
| +
|
| + /// The completer for the future returned by [addStream], if a stream is
|
| + /// currently being added.
|
| + Completer _addStreamCompleter;
|
| +
|
| + /// Whether we're currently adding a stream with [addStream].
|
| + bool get _inAddStream => _addStreamSubscription != null;
|
| +
|
| + _GuaranteeSink(this._inner, this._channel);
|
| +
|
| + void add(T data) {
|
| + if (_closed) throw new StateError("Cannot add event after closing.");
|
| + if (_inAddStream) {
|
| + throw new StateError("Cannot add event while adding stream.");
|
| + }
|
| + if (_disconnected) return;
|
| +
|
| + _inner.add(data);
|
| + }
|
| +
|
| + void addError(error, [StackTrace stackTrace]) {
|
| + if (_closed) throw new StateError("Cannot add event after closing.");
|
| + if (_inAddStream) {
|
| + throw new StateError("Cannot add event while adding stream.");
|
| + }
|
| + if (_disconnected) return;
|
| +
|
| + _inner.addError(error, stackTrace);
|
| + }
|
| +
|
| + Future addStream(Stream<T> stream) {
|
| + if (_closed) throw new StateError("Cannot add stream after closing.");
|
| + if (_inAddStream) {
|
| + throw new StateError("Cannot add stream while adding stream.");
|
| + }
|
| + if (_disconnected) return new Future.value();
|
| +
|
| + _addStreamCompleter = new Completer.sync();
|
| + _addStreamSubscription = stream.listen(
|
| + _inner.add,
|
| + onError: _inner.addError,
|
| + onDone: _addStreamCompleter.complete);
|
| + return _addStreamCompleter.future.then((_) {
|
| + _addStreamCompleter = null;
|
| + _addStreamSubscription = null;
|
| + });
|
| + }
|
| +
|
| + Future close() {
|
| + if (_inAddStream) {
|
| + throw new StateError("Cannot close sink while adding stream.");
|
| + }
|
| +
|
| + _closed = true;
|
| + if (_disconnected) return new Future.value();
|
| +
|
| + _channel._onSinkDisconnected();
|
| + return _inner.close();
|
| + }
|
| +
|
| + /// Called by [GuaranteeChannel] when the stream emits a done event.
|
| + ///
|
| + /// The stream being done indicates that the connection is closed, so the
|
| + /// sink should stop forwarding events.
|
| + void _onStreamDisconnected() {
|
| + _disconnected = true;
|
| + if (!_inAddStream) return;
|
| +
|
| + _addStreamCompleter.complete(_addStreamSubscription.cancel());
|
| + _addStreamCompleter = null;
|
| + _addStreamSubscription = null;
|
| + }
|
| +}
|
|
|