OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 import 'dart:isolate'; |
| 7 |
| 8 import '../stream_channel.dart'; |
| 9 |
| 10 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, |
| 11 /// presumably with another isolate. |
| 12 /// |
| 13 /// The remote endpoint doesn't necessarily need to be running an |
| 14 /// [IsolateChannel]. This can be used with any two ports, although the |
| 15 /// [StreamChannel] semantics mean that this class will treat them as being |
| 16 /// paired (for example, closing the [sink] will cause the [stream] to stop |
| 17 /// emitting events). |
| 18 /// |
| 19 /// The underlying isolate ports have no notion of closing connections. This |
| 20 /// means that [stream] won't close unless [sink] is closed, and that closing |
| 21 /// [sink] won't cause the remote endpoint to close. Users should take care to |
| 22 /// ensure that they always close the [sink] of every [IsolateChannel] they use |
| 23 /// to avoid leaving dangling [ReceivePort]s. |
| 24 class IsolateChannel<T> extends StreamChannelMixin<T> { |
| 25 /// The port that produces incoming messages. |
| 26 /// |
| 27 /// This is wrapped in a [StreamView] to produce [stream]. |
| 28 final ReceivePort _receivePort; |
| 29 |
| 30 /// The port that sends outgoing messages. |
| 31 final SendPort _sendPort; |
| 32 |
| 33 Stream<T> get stream => _stream; |
| 34 final Stream<T> _stream; |
| 35 |
| 36 StreamSink<T> get sink => _sink; |
| 37 _SendPortSink<T> _sink; |
| 38 |
| 39 /// Creates a stream channel that receives messages from [receivePort] and |
| 40 /// sends them over [sendPort]. |
| 41 IsolateChannel(ReceivePort receivePort, this._sendPort) |
| 42 : _receivePort = receivePort, |
| 43 _stream = new StreamView<T>(receivePort) { |
| 44 _sink = new _SendPortSink<T>(this); |
| 45 } |
| 46 } |
| 47 |
| 48 /// The sink for [IsolateChannel]. |
| 49 /// |
| 50 /// [SendPort] doesn't natively implement any sink API, so this adds that API as |
| 51 /// a wrapper. Closing this just closes the [ReceivePort]. |
| 52 class _SendPortSink<T> implements StreamSink<T> { |
| 53 /// The channel that this sink is for. |
| 54 final IsolateChannel _channel; |
| 55 |
| 56 Future get done => _doneCompleter.future; |
| 57 final _doneCompleter = new Completer(); |
| 58 |
| 59 /// Whether [done] has been completed. |
| 60 /// |
| 61 /// This is distinct from [_closed] because [done] can complete with an error |
| 62 /// without the user explicitly calling [close]. |
| 63 bool get _isDone => _doneCompleter.isCompleted; |
| 64 |
| 65 /// Whether the user has called [close]. |
| 66 bool _closed = false; |
| 67 |
| 68 /// Whether we're currently adding a stream with [addStream]. |
| 69 bool _inAddStream = false; |
| 70 |
| 71 _SendPortSink(this._channel); |
| 72 |
| 73 void add(T data) { |
| 74 if (_closed) throw new StateError("Cannot add event after closing."); |
| 75 if (_inAddStream) { |
| 76 throw new StateError("Cannot add event while adding stream."); |
| 77 } |
| 78 if (_isDone) return; |
| 79 |
| 80 _add(data); |
| 81 } |
| 82 |
| 83 /// A helper for [add] that doesn't check for [StateError]s. |
| 84 /// |
| 85 /// This is called from [addStream], so it shouldn't check [_inAddStream]. |
| 86 void _add(T data) { |
| 87 _channel._sendPort.send(data); |
| 88 } |
| 89 |
| 90 void addError(error, [StackTrace stackTrace]) { |
| 91 if (_closed) throw new StateError("Cannot add event after closing."); |
| 92 if (_inAddStream) { |
| 93 throw new StateError("Cannot add event while adding stream."); |
| 94 } |
| 95 |
| 96 _close(error, stackTrace); |
| 97 } |
| 98 |
| 99 Future close() { |
| 100 if (_inAddStream) { |
| 101 throw new StateError("Cannot close sink while adding stream."); |
| 102 } |
| 103 |
| 104 _closed = true; |
| 105 return _close(); |
| 106 } |
| 107 |
| 108 /// A helper for [close] that doesn't check for [StateError]s. |
| 109 /// |
| 110 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It |
| 111 /// also forwards [error] and [stackTrace] to [done] if they're passed. |
| 112 Future _close([error, StackTrace stackTrace]) { |
| 113 if (_isDone) return done; |
| 114 |
| 115 _channel._receivePort.close(); |
| 116 |
| 117 if (error != null) { |
| 118 _doneCompleter.completeError(error, stackTrace); |
| 119 } else { |
| 120 _doneCompleter.complete(); |
| 121 } |
| 122 |
| 123 return done; |
| 124 } |
| 125 |
| 126 Future addStream(Stream<T> stream) { |
| 127 if (_closed) throw new StateError("Cannot add stream after closing."); |
| 128 if (_inAddStream) { |
| 129 throw new StateError("Cannot add stream while adding stream."); |
| 130 } |
| 131 if (_isDone) return; |
| 132 |
| 133 _inAddStream = true; |
| 134 var completer = new Completer.sync(); |
| 135 stream.listen(_add, |
| 136 onError: (error, stackTrace) { |
| 137 _close(error, stackTrace); |
| 138 completer.complete(); |
| 139 }, |
| 140 onDone: completer.complete, |
| 141 cancelOnError: true); |
| 142 return completer.future.then((_) { |
| 143 _inAddStream = false; |
| 144 }); |
| 145 } |
| 146 } |
OLD | NEW |