Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: lib/src/isolate_channel.dart

Issue 1671763002: Make IsolateChannel use StreamChannelCompleter. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/guarantee_channel.dart ('k') | lib/src/isolate_channel/send_port_sink.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:isolate'; 6 import 'dart:isolate';
7 7
8 import 'package:async/async.dart'; 8 import 'package:async/async.dart';
9 import 'package:stack_trace/stack_trace.dart'; 9 import 'package:stack_trace/stack_trace.dart';
10 10
11 import '../stream_channel.dart'; 11 import '../stream_channel.dart';
12 import 'isolate_channel/send_port_sink.dart';
13 12
14 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, 13 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
15 /// presumably with another isolate. 14 /// presumably with another isolate.
16 /// 15 ///
17 /// The remote endpoint doesn't necessarily need to be running an 16 /// The remote endpoint doesn't necessarily need to be running an
18 /// [IsolateChannel]. This can be used with any two ports, although the 17 /// [IsolateChannel]. This can be used with any two ports, although the
19 /// [StreamChannel] semantics mean that this class will treat them as being 18 /// [StreamChannel] semantics mean that this class will treat them as being
20 /// paired (for example, closing the [sink] will cause the [stream] to stop 19 /// paired (for example, closing the [sink] will cause the [stream] to stop
21 /// emitting events). 20 /// emitting events).
22 /// 21 ///
(...skipping 24 matching lines...) Expand all
47 var sinkCompleter = new StreamSinkCompleter<T>(); 46 var sinkCompleter = new StreamSinkCompleter<T>();
48 var channel = new IsolateChannel._( 47 var channel = new IsolateChannel._(
49 streamCompleter.stream, sinkCompleter.sink); 48 streamCompleter.stream, sinkCompleter.sink);
50 49
51 // The first message across the ReceivePort should be a SendPort pointing to 50 // The first message across the ReceivePort should be a SendPort pointing to
52 // the remote end. If it's not, we'll make the stream emit an error 51 // the remote end. If it's not, we'll make the stream emit an error
53 // complaining. 52 // complaining.
54 var subscription; 53 var subscription;
55 subscription = receivePort.listen((message) { 54 subscription = receivePort.listen((message) {
56 if (message is SendPort) { 55 if (message is SendPort) {
57 streamCompleter.setSourceStream( 56 var controller = new StreamChannelController(allowForeignErrors: false);
58 new SubscriptionStream<T>(subscription)); 57 new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
59 sinkCompleter.setDestinationSink( 58 controller.local.stream.listen(message.send,
60 new SendPortSink<T>(receivePort, message)); 59 onDone: receivePort.close);
60
61 streamCompleter.setSourceStream(controller.foreign.stream);
62 sinkCompleter.setDestinationSink(controller.foreign.sink);
61 return; 63 return;
62 } 64 }
63 65
64 streamCompleter.setError( 66 streamCompleter.setError(
65 new StateError('Unexpected Isolate response "$message".'), 67 new StateError('Unexpected Isolate response "$message".'),
66 new Trace.current()); 68 new Trace.current());
67 sinkCompleter.setDestinationSink(new NullStreamSink<T>()); 69 sinkCompleter.setDestinationSink(new NullStreamSink<T>());
68 subscription.cancel(); 70 subscription.cancel();
69 }); 71 });
70 72
(...skipping 10 matching lines...) Expand all
81 /// The connection protocol is guaranteed to remain compatible across versions 83 /// The connection protocol is guaranteed to remain compatible across versions
82 /// at least until the next major version release. 84 /// at least until the next major version release.
83 factory IsolateChannel.connectSend(SendPort sendPort) { 85 factory IsolateChannel.connectSend(SendPort sendPort) {
84 var receivePort = new ReceivePort(); 86 var receivePort = new ReceivePort();
85 sendPort.send(receivePort.sendPort); 87 sendPort.send(receivePort.sendPort);
86 return new IsolateChannel(receivePort, sendPort); 88 return new IsolateChannel(receivePort, sendPort);
87 } 89 }
88 90
89 /// Creates a stream channel that receives messages from [receivePort] and 91 /// Creates a stream channel that receives messages from [receivePort] and
90 /// sends them over [sendPort]. 92 /// sends them over [sendPort].
91 IsolateChannel(ReceivePort receivePort, SendPort sendPort) 93 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
92 : stream = new StreamView<T>(receivePort), 94 var controller = new StreamChannelController(allowForeignErrors: false);
93 sink = new SendPortSink<T>(receivePort, sendPort); 95 receivePort.pipe(controller.local.sink);
96 controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
97 return new IsolateChannel._(
98 controller.foreign.stream, controller.foreign.sink);
99 }
94 100
95 IsolateChannel._(this.stream, this.sink); 101 IsolateChannel._(this.stream, this.sink);
96 } 102 }
OLDNEW
« no previous file with comments | « lib/src/guarantee_channel.dart ('k') | lib/src/isolate_channel/send_port_sink.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698