Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(796)

Side by Side Diff: lib/src/isolate_channel.dart

Issue 1690473002: Make IsolateChannel's controllers synchronous. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: . Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:isolate'; 6 import 'dart:isolate';
7 7
8 import 'package:async/async.dart'; 8 import 'package:async/async.dart';
9 import 'package:stack_trace/stack_trace.dart'; 9 import 'package:stack_trace/stack_trace.dart';
10 10
11 import '../stream_channel.dart'; 11 import '../stream_channel.dart';
12 12
13 import 'dart:io';
14
13 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, 15 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
14 /// presumably with another isolate. 16 /// presumably with another isolate.
15 /// 17 ///
16 /// The remote endpoint doesn't necessarily need to be running an 18 /// The remote endpoint doesn't necessarily need to be running an
17 /// [IsolateChannel]. This can be used with any two ports, although the 19 /// [IsolateChannel]. This can be used with any two ports, although the
18 /// [StreamChannel] semantics mean that this class will treat them as being 20 /// [StreamChannel] semantics mean that this class will treat them as being
19 /// paired (for example, closing the [sink] will cause the [stream] to stop 21 /// paired (for example, closing the [sink] will cause the [stream] to stop
20 /// emitting events). 22 /// emitting events).
21 /// 23 ///
22 /// The underlying isolate ports have no notion of closing connections. This 24 /// The underlying isolate ports have no notion of closing connections. This
(...skipping 23 matching lines...) Expand all
46 var sinkCompleter = new StreamSinkCompleter<T>(); 48 var sinkCompleter = new StreamSinkCompleter<T>();
47 var channel = new IsolateChannel._( 49 var channel = new IsolateChannel._(
48 streamCompleter.stream, sinkCompleter.sink); 50 streamCompleter.stream, sinkCompleter.sink);
49 51
50 // The first message across the ReceivePort should be a SendPort pointing to 52 // The first message across the ReceivePort should be a SendPort pointing to
51 // the remote end. If it's not, we'll make the stream emit an error 53 // the remote end. If it's not, we'll make the stream emit an error
52 // complaining. 54 // complaining.
53 var subscription; 55 var subscription;
54 subscription = receivePort.listen((message) { 56 subscription = receivePort.listen((message) {
55 if (message is SendPort) { 57 if (message is SendPort) {
56 var controller = new StreamChannelController(allowForeignErrors: false); 58 var controller = new StreamChannelController(
59 allowForeignErrors: false, sync: true);
57 new SubscriptionStream<T>(subscription).pipe(controller.local.sink); 60 new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
58 controller.local.stream.listen(message.send, 61 controller.local.stream.listen(message.send,
59 onDone: receivePort.close); 62 onDone: receivePort.close);
60 63
61 streamCompleter.setSourceStream(controller.foreign.stream); 64 streamCompleter.setSourceStream(controller.foreign.stream);
62 sinkCompleter.setDestinationSink(controller.foreign.sink); 65 sinkCompleter.setDestinationSink(controller.foreign.sink);
63 return; 66 return;
64 } 67 }
65 68
66 streamCompleter.setError( 69 streamCompleter.setError(
(...skipping 17 matching lines...) Expand all
84 /// at least until the next major version release. 87 /// at least until the next major version release.
85 factory IsolateChannel.connectSend(SendPort sendPort) { 88 factory IsolateChannel.connectSend(SendPort sendPort) {
86 var receivePort = new ReceivePort(); 89 var receivePort = new ReceivePort();
87 sendPort.send(receivePort.sendPort); 90 sendPort.send(receivePort.sendPort);
88 return new IsolateChannel(receivePort, sendPort); 91 return new IsolateChannel(receivePort, sendPort);
89 } 92 }
90 93
91 /// Creates a stream channel that receives messages from [receivePort] and 94 /// Creates a stream channel that receives messages from [receivePort] and
92 /// sends them over [sendPort]. 95 /// sends them over [sendPort].
93 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { 96 factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
94 var controller = new StreamChannelController(allowForeignErrors: false); 97 var controller = new StreamChannelController(
98 allowForeignErrors: false, sync: true);
95 receivePort.pipe(controller.local.sink); 99 receivePort.pipe(controller.local.sink);
96 controller.local.stream.listen(sendPort.send, onDone: receivePort.close); 100 controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
97 return new IsolateChannel._( 101 return new IsolateChannel._(
98 controller.foreign.stream, controller.foreign.sink); 102 controller.foreign.stream, controller.foreign.sink);
99 } 103 }
100 104
101 IsolateChannel._(this.stream, this.sink); 105 IsolateChannel._(this.stream, this.sink);
102 } 106 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698