Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(533)

Side by Side Diff: lib/src/isolate_channel.dart

Issue 1638183002: Add IsolateChannel.connect* constructors. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | lib/src/isolate_channel/send_port_sink.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:isolate'; 6 import 'dart:isolate';
7 7
8 import 'package:async/async.dart';
9 import 'package:stack_trace/stack_trace.dart';
10
8 import '../stream_channel.dart'; 11 import '../stream_channel.dart';
12 import 'isolate_channel/send_port_sink.dart';
9 13
10 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, 14 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
11 /// presumably with another isolate. 15 /// presumably with another isolate.
12 /// 16 ///
13 /// The remote endpoint doesn't necessarily need to be running an 17 /// The remote endpoint doesn't necessarily need to be running an
14 /// [IsolateChannel]. This can be used with any two ports, although the 18 /// [IsolateChannel]. This can be used with any two ports, although the
15 /// [StreamChannel] semantics mean that this class will treat them as being 19 /// [StreamChannel] semantics mean that this class will treat them as being
16 /// paired (for example, closing the [sink] will cause the [stream] to stop 20 /// paired (for example, closing the [sink] will cause the [stream] to stop
17 /// emitting events). 21 /// emitting events).
18 /// 22 ///
19 /// The underlying isolate ports have no notion of closing connections. This 23 /// The underlying isolate ports have no notion of closing connections. This
20 /// means that [stream] won't close unless [sink] is closed, and that closing 24 /// means that [stream] won't close unless [sink] is closed, and that closing
21 /// [sink] won't cause the remote endpoint to close. Users should take care to 25 /// [sink] won't cause the remote endpoint to close. Users should take care to
22 /// ensure that they always close the [sink] of every [IsolateChannel] they use 26 /// ensure that they always close the [sink] of every [IsolateChannel] they use
23 /// to avoid leaving dangling [ReceivePort]s. 27 /// to avoid leaving dangling [ReceivePort]s.
24 class IsolateChannel<T> extends StreamChannelMixin<T> { 28 class IsolateChannel<T> extends StreamChannelMixin<T> {
25 /// The port that produces incoming messages. 29 final Stream<T> stream;
30 final StreamSink<T> sink;
31
32 /// Connects to a remote channel that was created with
33 /// [IsolateChannel.connectSend].
26 /// 34 ///
27 /// This is wrapped in a [StreamView] to produce [stream]. 35 /// These constructors establish a connection using only a single
28 final ReceivePort _receivePort; 36 /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
37 /// connect constructors.
38 ///
39 /// The connection protocol is guaranteed to remain compatible across versions
40 /// at least until the next major version release. If the protocol is
41 /// violated, the resulting channel will emit a single value on its stream and
42 /// then close.
43 factory IsolateChannel.connectReceive(ReceivePort receivePort) {
44 // We can't use a [StreamChannelCompleter] here because we need the return
45 // value to be an [IsolateChannel].
46 var streamCompleter = new StreamCompleter<T>();
47 var sinkCompleter = new StreamSinkCompleter<T>();
48 var channel = new IsolateChannel._(
49 streamCompleter.stream, sinkCompleter.sink);
29 50
30 /// The port that sends outgoing messages. 51 // The first message across the ReceivePort should be a SendPort pointing to
31 final SendPort _sendPort; 52 // the remote end. If it's not, we'll make the stream emit an error
53 // complaining.
54 var subscription;
55 subscription = receivePort.listen((message) {
56 if (message is SendPort) {
57 streamCompleter.setSourceStream(
58 new SubscriptionStream<T>(subscription));
59 sinkCompleter.setDestinationSink(
60 new SendPortSink<T>(receivePort, message));
61 return;
62 }
32 63
33 Stream<T> get stream => _stream; 64 streamCompleter.setError(
34 final Stream<T> _stream; 65 new StateError('Unexpected Isolate response "$message".'),
66 new Trace.current());
67 sinkCompleter.setDestinationSink(new NullStreamSink<T>());
68 subscription.cancel();
69 });
35 70
36 StreamSink<T> get sink => _sink; 71 return channel;
37 _SendPortSink<T> _sink; 72 }
73
74 /// Connects to a remote channel that was created with
75 /// [IsolateChannel.connectReceive].
76 ///
77 /// These constructors establish a connection using only a single
78 /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
79 /// connect constructors.
80 ///
81 /// The connection protocol is guaranteed to remain compatible across versions
82 /// at least until the next major version release.
83 factory IsolateChannel.connectSend(SendPort sendPort) {
84 var receivePort = new ReceivePort();
Bob Nystrom 2016/01/27 20:39:47 Go home, Reitveld diff algorithm, you are drunk.
85 sendPort.send(receivePort.sendPort);
86 return new IsolateChannel(receivePort, sendPort);
87 }
38 88
39 /// Creates a stream channel that receives messages from [receivePort] and 89 /// Creates a stream channel that receives messages from [receivePort] and
40 /// sends them over [sendPort]. 90 /// sends them over [sendPort].
41 IsolateChannel(ReceivePort receivePort, this._sendPort) 91 IsolateChannel(ReceivePort receivePort, SendPort sendPort)
42 : _receivePort = receivePort, 92 : stream = new StreamView<T>(receivePort),
43 _stream = new StreamView<T>(receivePort) { 93 sink = new SendPortSink<T>(receivePort, sendPort);
44 _sink = new _SendPortSink<T>(this); 94
45 } 95 IsolateChannel._(this.stream, this.sink);
46 } 96 }
47
48 /// The sink for [IsolateChannel].
49 ///
50 /// [SendPort] doesn't natively implement any sink API, so this adds that API as
51 /// a wrapper. Closing this just closes the [ReceivePort].
52 class _SendPortSink<T> implements StreamSink<T> {
53 /// The channel that this sink is for.
54 final IsolateChannel _channel;
55
56 Future get done => _doneCompleter.future;
57 final _doneCompleter = new Completer();
58
59 /// Whether [done] has been completed.
60 ///
61 /// This is distinct from [_closed] because [done] can complete with an error
62 /// without the user explicitly calling [close].
63 bool get _isDone => _doneCompleter.isCompleted;
64
65 /// Whether the user has called [close].
66 bool _closed = false;
67
68 /// Whether we're currently adding a stream with [addStream].
69 bool _inAddStream = false;
70
71 _SendPortSink(this._channel);
72
73 void add(T data) {
74 if (_closed) throw new StateError("Cannot add event after closing.");
75 if (_inAddStream) {
76 throw new StateError("Cannot add event while adding stream.");
77 }
78 if (_isDone) return;
79
80 _add(data);
81 }
82
83 /// A helper for [add] that doesn't check for [StateError]s.
84 ///
85 /// This is called from [addStream], so it shouldn't check [_inAddStream].
86 void _add(T data) {
87 _channel._sendPort.send(data);
88 }
89
90 void addError(error, [StackTrace stackTrace]) {
91 if (_closed) throw new StateError("Cannot add event after closing.");
92 if (_inAddStream) {
93 throw new StateError("Cannot add event while adding stream.");
94 }
95
96 _close(error, stackTrace);
97 }
98
99 Future close() {
100 if (_inAddStream) {
101 throw new StateError("Cannot close sink while adding stream.");
102 }
103
104 _closed = true;
105 return _close();
106 }
107
108 /// A helper for [close] that doesn't check for [StateError]s.
109 ///
110 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
111 /// also forwards [error] and [stackTrace] to [done] if they're passed.
112 Future _close([error, StackTrace stackTrace]) {
113 if (_isDone) return done;
114
115 _channel._receivePort.close();
116
117 if (error != null) {
118 _doneCompleter.completeError(error, stackTrace);
119 } else {
120 _doneCompleter.complete();
121 }
122
123 return done;
124 }
125
126 Future addStream(Stream<T> stream) {
127 if (_closed) throw new StateError("Cannot add stream after closing.");
128 if (_inAddStream) {
129 throw new StateError("Cannot add stream while adding stream.");
130 }
131 if (_isDone) return;
132
133 _inAddStream = true;
134 var completer = new Completer.sync();
135 stream.listen(_add,
136 onError: (error, stackTrace) {
137 _close(error, stackTrace);
138 completer.complete();
139 },
140 onDone: completer.complete,
141 cancelOnError: true);
142 return completer.future.then((_) {
143 _inAddStream = false;
144 });
145 }
146 }
OLDNEW
« no previous file with comments | « no previous file | lib/src/isolate_channel/send_port_sink.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698