OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.isolate; | 5 part of dart.isolate; |
6 | 6 |
7 /** | 7 /** |
8 * The initial [IsolateStream] available by default for this isolate. This | 8 * The initial [IsolateStream] available by default for this isolate. This |
9 * [IsolateStream] is created automatically and it is commonly used to establish | 9 * [IsolateStream] is created automatically and it is commonly used to establish |
10 * the first communication between isolates (see [streamSpawnFunction] and | 10 * the first communication between isolates (see [streamSpawnFunction] and |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
45 | 45 |
46 /** | 46 /** |
47 * [IsolateStream]s, together with [IsolateSink]s, are the only means of | 47 * [IsolateStream]s, together with [IsolateSink]s, are the only means of |
48 * communication between isolates. Each IsolateStream has a corresponding | 48 * communication between isolates. Each IsolateStream has a corresponding |
49 * [IsolateSink]. Any message written into that sink will be delivered to | 49 * [IsolateSink]. Any message written into that sink will be delivered to |
50 * the stream and then dispatched to the stream's subscribers. | 50 * the stream and then dispatched to the stream's subscribers. |
51 */ | 51 */ |
52 class IsolateStream extends Stream<dynamic> { | 52 class IsolateStream extends Stream<dynamic> { |
53 bool _isClosed = false; | 53 bool _isClosed = false; |
54 final ReceivePort _port; | 54 final ReceivePort _port; |
55 StreamController _controller = new StreamController.multiSubscription(); | 55 StreamController _controller = new StreamController.broadcast(); |
56 | 56 |
57 IsolateStream._fromOriginalReceivePort(this._port) { | 57 IsolateStream._fromOriginalReceivePort(this._port) { |
58 _port.receive((message, replyTo) { | 58 _port.receive((message, replyTo) { |
59 assert(replyTo == null); | 59 assert(replyTo == null); |
60 _add(message); | 60 _add(message); |
61 }); | 61 }); |
62 } | 62 } |
63 | 63 |
64 IsolateStream._fromOriginalReceivePortOneShot(this._port) { | 64 IsolateStream._fromOriginalReceivePortOneShot(this._port) { |
65 _port.receive((message, replyTo) { | 65 _port.receive((message, replyTo) { |
(...skipping 22 matching lines...) Expand all Loading... |
88 _isClosed = true; | 88 _isClosed = true; |
89 _port.close(); | 89 _port.close(); |
90 _controller.close(); | 90 _controller.close(); |
91 } | 91 } |
92 } | 92 } |
93 | 93 |
94 StreamSubscription listen(void onData(event), | 94 StreamSubscription listen(void onData(event), |
95 { void onError(AsyncError error), | 95 { void onError(AsyncError error), |
96 void onDone(), | 96 void onDone(), |
97 bool unsubscribeOnError}) { | 97 bool unsubscribeOnError}) { |
98 return _controller.listen(onData, | 98 return _controller.stream.listen(onData, |
99 onError: onError, | 99 onError: onError, |
100 onDone: onDone, | 100 onDone: onDone, |
101 unsubscribeOnError: unsubscribeOnError); | 101 unsubscribeOnError: unsubscribeOnError); |
102 } | 102 } |
103 | 103 |
104 dynamic _unmangleMessage(var message) { | 104 dynamic _unmangleMessage(var message) { |
105 _IsolateDecoder decoder = new _IsolateDecoder( | 105 _IsolateDecoder decoder = new _IsolateDecoder( |
106 _ISOLATE_STREAM_TOKEN, | 106 _ISOLATE_STREAM_TOKEN, |
107 (data) { | 107 (data) { |
108 if (data is! List) return data; | 108 if (data is! List) return data; |
109 if (data.length == 2 && data[0] == "Sink" && data[1] is SendPort) { | 109 if (data.length == 2 && data[0] == "Sink" && data[1] is SendPort) { |
110 return new IsolateSink._fromPort(data[1]); | 110 return new IsolateSink._fromPort(data[1]); |
111 } | 111 } |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
206 * Creates and spawns an isolate whose code is available at [uri]. Like with | 206 * Creates and spawns an isolate whose code is available at [uri]. Like with |
207 * [streamSpawnFunction], the child isolate will have a default [IsolateStream], | 207 * [streamSpawnFunction], the child isolate will have a default [IsolateStream], |
208 * and a this function returns an [IsolateSink] feeding into it. | 208 * and a this function returns an [IsolateSink] feeding into it. |
209 * | 209 * |
210 * See comments at the top of this library for more details. | 210 * See comments at the top of this library for more details. |
211 */ | 211 */ |
212 IsolateSink streamSpawnUri(String uri) { | 212 IsolateSink streamSpawnUri(String uri) { |
213 SendPort sendPort = spawnUri(uri); | 213 SendPort sendPort = spawnUri(uri); |
214 return new IsolateSink._fromPort(sendPort); | 214 return new IsolateSink._fromPort(sendPort); |
215 } | 215 } |
OLD | NEW |