Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(694)

Side by Side Diff: lib/src/isolate_channel/send_port_sink.dart

Issue 1671763002: Make IsolateChannel use StreamChannelCompleter. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/isolate_channel.dart ('k') | lib/src/stream_channel_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6 import 'dart:isolate';
7
8 /// The sink for [IsolateChannel].
9 ///
10 /// [SendPort] doesn't natively implement any sink API, so this adds that API as
11 /// a wrapper. Closing this just closes the [ReceivePort].
12 class SendPortSink<T> implements StreamSink<T> {
13 /// The port that produces incoming messages.
14 ///
15 /// This is wrapped in a [StreamView] to produce [stream].
16 final ReceivePort _receivePort;
17
18 /// The port that sends outgoing messages.
19 final SendPort _sendPort;
20
21 Future get done => _doneCompleter.future;
22 final _doneCompleter = new Completer();
23
24 /// Whether [done] has been completed.
25 ///
26 /// This is distinct from [_closed] because [done] can complete with an error
27 /// without the user explicitly calling [close].
28 bool get _isDone => _doneCompleter.isCompleted;
29
30 /// Whether the user has called [close].
31 bool _closed = false;
32
33 /// Whether we're currently adding a stream with [addStream].
34 bool _inAddStream = false;
35
36 SendPortSink(this._receivePort, this._sendPort);
37
38 void add(T data) {
39 if (_closed) throw new StateError("Cannot add event after closing.");
40 if (_inAddStream) {
41 throw new StateError("Cannot add event while adding stream.");
42 }
43 if (_isDone) return;
44
45 _add(data);
46 }
47
48 /// A helper for [add] that doesn't check for [StateError]s.
49 ///
50 /// This is called from [addStream], so it shouldn't check [_inAddStream].
51 void _add(T data) {
52 _sendPort.send(data);
53 }
54
55 void addError(error, [StackTrace stackTrace]) {
56 if (_closed) throw new StateError("Cannot add event after closing.");
57 if (_inAddStream) {
58 throw new StateError("Cannot add event while adding stream.");
59 }
60
61 _close(error, stackTrace);
62 }
63
64 Future close() {
65 if (_inAddStream) {
66 throw new StateError("Cannot close sink while adding stream.");
67 }
68
69 _closed = true;
70 return _close();
71 }
72
73 /// A helper for [close] that doesn't check for [StateError]s.
74 ///
75 /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
76 /// also forwards [error] and [stackTrace] to [done] if they're passed.
77 Future _close([error, StackTrace stackTrace]) {
78 if (_isDone) return done;
79
80 _receivePort.close();
81
82 if (error != null) {
83 _doneCompleter.completeError(error, stackTrace);
84 } else {
85 _doneCompleter.complete();
86 }
87
88 return done;
89 }
90
91 Future addStream(Stream<T> stream) {
92 if (_closed) throw new StateError("Cannot add stream after closing.");
93 if (_inAddStream) {
94 throw new StateError("Cannot add stream while adding stream.");
95 }
96 if (_isDone) return new Future.value();
97
98 _inAddStream = true;
99 var completer = new Completer.sync();
100 stream.listen(_add,
101 onError: (error, stackTrace) {
102 _close(error, stackTrace);
103 completer.complete();
104 },
105 onDone: completer.complete,
106 cancelOnError: true);
107 return completer.future.then((_) {
108 _inAddStream = false;
109 });
110 }
111 }
OLDNEW
« no previous file with comments | « lib/src/isolate_channel.dart ('k') | lib/src/stream_channel_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698