OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:isolate'; | 6 import 'dart:isolate'; |
7 | 7 |
8 import '../stream_channel.dart'; | 8 /// The sink for [IsolateChannel]. |
9 | |
10 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, | |
11 /// presumably with another isolate. | |
12 /// | 9 /// |
13 /// The remote endpoint doesn't necessarily need to be running an | 10 /// [SendPort] doesn't natively implement any sink API, so this adds that API as |
14 /// [IsolateChannel]. This can be used with any two ports, although the | 11 /// a wrapper. Closing this just closes the [ReceivePort]. |
15 /// [StreamChannel] semantics mean that this class will treat them as being | 12 class SendPortSink<T> implements StreamSink<T> { |
16 /// paired (for example, closing the [sink] will cause the [stream] to stop | |
17 /// emitting events). | |
18 /// | |
19 /// The underlying isolate ports have no notion of closing connections. This | |
20 /// means that [stream] won't close unless [sink] is closed, and that closing | |
21 /// [sink] won't cause the remote endpoint to close. Users should take care to | |
22 /// ensure that they always close the [sink] of every [IsolateChannel] they use | |
23 /// to avoid leaving dangling [ReceivePort]s. | |
24 class IsolateChannel<T> extends StreamChannelMixin<T> { | |
25 /// The port that produces incoming messages. | 13 /// The port that produces incoming messages. |
26 /// | 14 /// |
27 /// This is wrapped in a [StreamView] to produce [stream]. | 15 /// This is wrapped in a [StreamView] to produce [stream]. |
28 final ReceivePort _receivePort; | 16 final ReceivePort _receivePort; |
29 | 17 |
30 /// The port that sends outgoing messages. | 18 /// The port that sends outgoing messages. |
31 final SendPort _sendPort; | 19 final SendPort _sendPort; |
32 | 20 |
33 Stream<T> get stream => _stream; | |
34 final Stream<T> _stream; | |
35 | |
36 StreamSink<T> get sink => _sink; | |
37 _SendPortSink<T> _sink; | |
38 | |
39 /// Creates a stream channel that receives messages from [receivePort] and | |
40 /// sends them over [sendPort]. | |
41 IsolateChannel(ReceivePort receivePort, this._sendPort) | |
42 : _receivePort = receivePort, | |
43 _stream = new StreamView<T>(receivePort) { | |
44 _sink = new _SendPortSink<T>(this); | |
45 } | |
46 } | |
47 | |
48 /// The sink for [IsolateChannel]. | |
49 /// | |
50 /// [SendPort] doesn't natively implement any sink API, so this adds that API as | |
51 /// a wrapper. Closing this just closes the [ReceivePort]. | |
52 class _SendPortSink<T> implements StreamSink<T> { | |
53 /// The channel that this sink is for. | |
54 final IsolateChannel _channel; | |
55 | |
56 Future get done => _doneCompleter.future; | 21 Future get done => _doneCompleter.future; |
57 final _doneCompleter = new Completer(); | 22 final _doneCompleter = new Completer(); |
58 | 23 |
59 /// Whether [done] has been completed. | 24 /// Whether [done] has been completed. |
60 /// | 25 /// |
61 /// This is distinct from [_closed] because [done] can complete with an error | 26 /// This is distinct from [_closed] because [done] can complete with an error |
62 /// without the user explicitly calling [close]. | 27 /// without the user explicitly calling [close]. |
63 bool get _isDone => _doneCompleter.isCompleted; | 28 bool get _isDone => _doneCompleter.isCompleted; |
64 | 29 |
65 /// Whether the user has called [close]. | 30 /// Whether the user has called [close]. |
66 bool _closed = false; | 31 bool _closed = false; |
67 | 32 |
68 /// Whether we're currently adding a stream with [addStream]. | 33 /// Whether we're currently adding a stream with [addStream]. |
69 bool _inAddStream = false; | 34 bool _inAddStream = false; |
70 | 35 |
71 _SendPortSink(this._channel); | 36 SendPortSink(this._receivePort, this._sendPort); |
72 | 37 |
73 void add(T data) { | 38 void add(T data) { |
74 if (_closed) throw new StateError("Cannot add event after closing."); | 39 if (_closed) throw new StateError("Cannot add event after closing."); |
75 if (_inAddStream) { | 40 if (_inAddStream) { |
76 throw new StateError("Cannot add event while adding stream."); | 41 throw new StateError("Cannot add event while adding stream."); |
77 } | 42 } |
78 if (_isDone) return; | 43 if (_isDone) return; |
79 | 44 |
80 _add(data); | 45 _add(data); |
81 } | 46 } |
82 | 47 |
83 /// A helper for [add] that doesn't check for [StateError]s. | 48 /// A helper for [add] that doesn't check for [StateError]s. |
84 /// | 49 /// |
85 /// This is called from [addStream], so it shouldn't check [_inAddStream]. | 50 /// This is called from [addStream], so it shouldn't check [_inAddStream]. |
86 void _add(T data) { | 51 void _add(T data) { |
87 _channel._sendPort.send(data); | 52 _sendPort.send(data); |
88 } | 53 } |
89 | 54 |
90 void addError(error, [StackTrace stackTrace]) { | 55 void addError(error, [StackTrace stackTrace]) { |
91 if (_closed) throw new StateError("Cannot add event after closing."); | 56 if (_closed) throw new StateError("Cannot add event after closing."); |
92 if (_inAddStream) { | 57 if (_inAddStream) { |
93 throw new StateError("Cannot add event while adding stream."); | 58 throw new StateError("Cannot add event while adding stream."); |
94 } | 59 } |
95 | 60 |
96 _close(error, stackTrace); | 61 _close(error, stackTrace); |
97 } | 62 } |
98 | 63 |
99 Future close() { | 64 Future close() { |
100 if (_inAddStream) { | 65 if (_inAddStream) { |
101 throw new StateError("Cannot close sink while adding stream."); | 66 throw new StateError("Cannot close sink while adding stream."); |
102 } | 67 } |
103 | 68 |
104 _closed = true; | 69 _closed = true; |
105 return _close(); | 70 return _close(); |
106 } | 71 } |
107 | 72 |
108 /// A helper for [close] that doesn't check for [StateError]s. | 73 /// A helper for [close] that doesn't check for [StateError]s. |
109 /// | 74 /// |
110 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It | 75 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It |
111 /// also forwards [error] and [stackTrace] to [done] if they're passed. | 76 /// also forwards [error] and [stackTrace] to [done] if they're passed. |
112 Future _close([error, StackTrace stackTrace]) { | 77 Future _close([error, StackTrace stackTrace]) { |
113 if (_isDone) return done; | 78 if (_isDone) return done; |
114 | 79 |
115 _channel._receivePort.close(); | 80 _receivePort.close(); |
116 | 81 |
117 if (error != null) { | 82 if (error != null) { |
118 _doneCompleter.completeError(error, stackTrace); | 83 _doneCompleter.completeError(error, stackTrace); |
119 } else { | 84 } else { |
120 _doneCompleter.complete(); | 85 _doneCompleter.complete(); |
121 } | 86 } |
122 | 87 |
123 return done; | 88 return done; |
124 } | 89 } |
125 | 90 |
126 Future addStream(Stream<T> stream) { | 91 Future addStream(Stream<T> stream) { |
127 if (_closed) throw new StateError("Cannot add stream after closing."); | 92 if (_closed) throw new StateError("Cannot add stream after closing."); |
128 if (_inAddStream) { | 93 if (_inAddStream) { |
129 throw new StateError("Cannot add stream while adding stream."); | 94 throw new StateError("Cannot add stream while adding stream."); |
130 } | 95 } |
131 if (_isDone) return; | 96 if (_isDone) return new Future.value(); |
132 | 97 |
133 _inAddStream = true; | 98 _inAddStream = true; |
134 var completer = new Completer.sync(); | 99 var completer = new Completer.sync(); |
135 stream.listen(_add, | 100 stream.listen(_add, |
136 onError: (error, stackTrace) { | 101 onError: (error, stackTrace) { |
137 _close(error, stackTrace); | 102 _close(error, stackTrace); |
138 completer.complete(); | 103 completer.complete(); |
139 }, | 104 }, |
140 onDone: completer.complete, | 105 onDone: completer.complete, |
141 cancelOnError: true); | 106 cancelOnError: true); |
142 return completer.future.then((_) { | 107 return completer.future.then((_) { |
143 _inAddStream = false; | 108 _inAddStream = false; |
144 }); | 109 }); |
145 } | 110 } |
146 } | 111 } |
OLD | NEW |