Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Unified Diff: lib/src/isolate_channel/send_port_sink.dart

Issue 1638183002: Add IsolateChannel.connect* constructors. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/isolate_channel/send_port_sink.dart
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel/send_port_sink.dart
similarity index 65%
copy from lib/src/isolate_channel.dart
copy to lib/src/isolate_channel/send_port_sink.dart
index c6645438d6b02daf99ff7d916df884cabc2d59e5..d98f1da65db04e2ee798622c6d32c1ed5ebef707 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel/send_port_sink.dart
@@ -5,23 +5,11 @@
import 'dart:async';
import 'dart:isolate';
-import '../stream_channel.dart';
-
-/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
-/// presumably with another isolate.
-///
-/// The remote endpoint doesn't necessarily need to be running an
-/// [IsolateChannel]. This can be used with any two ports, although the
-/// [StreamChannel] semantics mean that this class will treat them as being
-/// paired (for example, closing the [sink] will cause the [stream] to stop
-/// emitting events).
+/// The sink for [IsolateChannel].
///
-/// The underlying isolate ports have no notion of closing connections. This
-/// means that [stream] won't close unless [sink] is closed, and that closing
-/// [sink] won't cause the remote endpoint to close. Users should take care to
-/// ensure that they always close the [sink] of every [IsolateChannel] they use
-/// to avoid leaving dangling [ReceivePort]s.
-class IsolateChannel<T> extends StreamChannelMixin<T> {
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class SendPortSink<T> implements StreamSink<T> {
/// The port that produces incoming messages.
///
/// This is wrapped in a [StreamView] to produce [stream].
@@ -30,29 +18,6 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
/// The port that sends outgoing messages.
final SendPort _sendPort;
- Stream<T> get stream => _stream;
- final Stream<T> _stream;
-
- StreamSink<T> get sink => _sink;
- _SendPortSink<T> _sink;
-
- /// Creates a stream channel that receives messages from [receivePort] and
- /// sends them over [sendPort].
- IsolateChannel(ReceivePort receivePort, this._sendPort)
- : _receivePort = receivePort,
- _stream = new StreamView<T>(receivePort) {
- _sink = new _SendPortSink<T>(this);
- }
-}
-
-/// The sink for [IsolateChannel].
-///
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as
-/// a wrapper. Closing this just closes the [ReceivePort].
-class _SendPortSink<T> implements StreamSink<T> {
- /// The channel that this sink is for.
- final IsolateChannel _channel;
-
Future get done => _doneCompleter.future;
final _doneCompleter = new Completer();
@@ -68,7 +33,7 @@ class _SendPortSink<T> implements StreamSink<T> {
/// Whether we're currently adding a stream with [addStream].
bool _inAddStream = false;
- _SendPortSink(this._channel);
+ SendPortSink(this._receivePort, this._sendPort);
void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
@@ -84,7 +49,7 @@ class _SendPortSink<T> implements StreamSink<T> {
///
/// This is called from [addStream], so it shouldn't check [_inAddStream].
void _add(T data) {
- _channel._sendPort.send(data);
+ _sendPort.send(data);
}
void addError(error, [StackTrace stackTrace]) {
@@ -112,7 +77,7 @@ class _SendPortSink<T> implements StreamSink<T> {
Future _close([error, StackTrace stackTrace]) {
if (_isDone) return done;
- _channel._receivePort.close();
+ _receivePort.close();
if (error != null) {
_doneCompleter.completeError(error, stackTrace);
@@ -128,7 +93,7 @@ class _SendPortSink<T> implements StreamSink<T> {
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
}
- if (_isDone) return;
+ if (_isDone) return new Future.value();
_inAddStream = true;
var completer = new Completer.sync();
« lib/src/isolate_channel.dart ('K') | « lib/src/isolate_channel.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698