Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(259)

Side by Side Diff: examples/dart/netcat/lib/main.dart

Issue 1414483010: Dart: Use a RawReceivePort to receive events for Mojo handles. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « benchmarks/mojo_rtt_benchmark/lib/main.dart ('k') | mojo/dart/embedder/io/socket_patch.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!mojo mojo:dart_content_handler 1 #!mojo mojo:dart_content_handler
2 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Copyright 2014 The Chromium Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be 3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. 4 // found in the LICENSE file.
5 5
6 import 'dart:async'; 6 import 'dart:async';
7 import 'dart:core'; 7 import 'dart:core';
8 import 'dart:typed_data'; 8 import 'dart:typed_data';
9 9
10 import 'package:mojo/application.dart'; 10 import 'package:mojo/application.dart';
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 // * Relatedly, we should listen for _socketSender's peer being closed (also 42 // * Relatedly, we should listen for _socketSender's peer being closed (also
43 // _socket, I guess). 43 // _socket, I guess).
44 // * Handle the socket send pipe being full (currently, we assume it's never 44 // * Handle the socket send pipe being full (currently, we assume it's never
45 // full). 45 // full).
46 class Connector { 46 class Connector {
47 final Application _application; 47 final Application _application;
48 files.FileProxy _terminal; 48 files.FileProxy _terminal;
49 TcpConnectedSocketProxy _socket; 49 TcpConnectedSocketProxy _socket;
50 MojoDataPipeProducer _socketSender; 50 MojoDataPipeProducer _socketSender;
51 MojoDataPipeConsumer _socketReceiver; 51 MojoDataPipeConsumer _socketReceiver;
52 MojoEventStream _socketReceiverEventStream; 52 MojoEventSubscription _socketReceiverEventSubscription;
53 final ByteData _readBuffer; 53 final ByteData _readBuffer;
54 final ByteData _writeBuffer; 54 final ByteData _writeBuffer;
55 55
56 // TODO(vtl): Don't just hard-code buffer sizes. 56 // TODO(vtl): Don't just hard-code buffer sizes.
57 Connector(this._application, this._terminal) 57 Connector(this._application, this._terminal)
58 : _readBuffer = new ByteData(16 * 1024), 58 : _readBuffer = new ByteData(16 * 1024),
59 _writeBuffer = new ByteData(16 * 1024); 59 _writeBuffer = new ByteData(16 * 1024);
60 60
61 Future connect(NetAddress remote_address) async { 61 Future connect(NetAddress remote_address) async {
62 try { 62 try {
(...skipping 11 matching lines...) Expand all
74 _socketReceiver = receiveDataPipe.consumer; 74 _socketReceiver = receiveDataPipe.consumer;
75 _socket = new TcpConnectedSocketProxy.unbound(); 75 _socket = new TcpConnectedSocketProxy.unbound();
76 await boundSocket.ptr.connect(remote_address, sendDataPipe.consumer, 76 await boundSocket.ptr.connect(remote_address, sendDataPipe.consumer,
77 receiveDataPipe.producer, _socket); 77 receiveDataPipe.producer, _socket);
78 await boundSocket.close(); 78 await boundSocket.close();
79 79
80 // Set up reading from the terminal. 80 // Set up reading from the terminal.
81 _startReadingFromTerminal(); 81 _startReadingFromTerminal();
82 82
83 // Set up reading from the socket. 83 // Set up reading from the socket.
84 _socketReceiverEventStream = new MojoEventStream(_socketReceiver.handle); 84 _socketReceiverEventSubscription =
85 _socketReceiverEventStream.listen(_onSocketReceiverEvent); 85 new MojoEventSubscription(_socketReceiver.handle);
86 _socketReceiverEventSubscription.subscribe(_onSocketReceiverEvent);
86 } catch (e) { 87 } catch (e) {
87 _shutDown(); 88 _shutDown();
88 } 89 }
89 } 90 }
90 91
91 void _startReadingFromTerminal() { 92 void _startReadingFromTerminal() {
92 // TODO(vtl): Do we have to do something on error? 93 // TODO(vtl): Do we have to do something on error?
93 _terminal.ptr 94 _terminal.ptr
94 .read(_writeBuffer.lengthInBytes, 0, files.Whence.FROM_CURRENT) 95 .read(_writeBuffer.lengthInBytes, 0, files.Whence.FROM_CURRENT)
95 .then(_onReadFromTerminal) 96 .then(_onReadFromTerminal)
96 .catchError((e) { _shutDown(); }); 97 .catchError((e) {
98 _shutDown();
99 });
97 } 100 }
98 101
99 void _onReadFromTerminal(files.FileReadResponseParams p) { 102 void _onReadFromTerminal(files.FileReadResponseParams p) {
100 if (p.error != files.Error.OK) { 103 if (p.error != files.Error.OK) {
101 // TODO(vtl): Do terminal errors. 104 // TODO(vtl): Do terminal errors.
102 return; 105 return;
103 } 106 }
104 107
105 // TODO(vtl): Verify that |bytesRead.length| is within the expected range. 108 // TODO(vtl): Verify that |bytesRead.length| is within the expected range.
106 for (var i = 0, j = 0; i < p.bytesRead.length; i++, j++) { 109 for (var i = 0, j = 0; i < p.bytesRead.length; i++, j++) {
(...skipping 13 matching lines...) Expand all
120 _startReadingFromTerminal(); 123 _startReadingFromTerminal();
121 } 124 }
122 125
123 void _onSocketReceiverEvent(List<int> event) { 126 void _onSocketReceiverEvent(List<int> event) {
124 var mojoSignals = new MojoHandleSignals(event[1]); 127 var mojoSignals = new MojoHandleSignals(event[1]);
125 var shouldShutDown = false; 128 var shouldShutDown = false;
126 if (mojoSignals.isReadable) { 129 if (mojoSignals.isReadable) {
127 var numBytesRead = _socketReceiver.read(_readBuffer); 130 var numBytesRead = _socketReceiver.read(_readBuffer);
128 if (_socketReceiver.status.isOk) { 131 if (_socketReceiver.status.isOk) {
129 assert(numBytesRead > 0); 132 assert(numBytesRead > 0);
130 _terminal.ptr.write(_readBuffer.buffer.asUint8List(0, numBytesRead), 0, 133 _terminal.ptr
131 files.Whence.FROM_CURRENT) 134 .write(_readBuffer.buffer.asUint8List(0, numBytesRead), 0,
132 .catchError((e) { _shutDown(); }); 135 files.Whence.FROM_CURRENT)
133 _socketReceiverEventStream.enableReadEvents(); 136 .catchError((e) {
137 _shutDown();
138 });
139 _socketReceiverEventSubscription.enableReadEvents();
134 } else { 140 } else {
135 shouldShutDown = true; 141 shouldShutDown = true;
136 } 142 }
137 } else if (mojoSignals.isPeerClosed) { 143 } else if (mojoSignals.isPeerClosed) {
138 shouldShutDown = true; 144 shouldShutDown = true;
139 } else { 145 } else {
140 throw 'Unexpected handle event: $mojoSignals'; 146 throw 'Unexpected handle event: $mojoSignals';
141 } 147 }
142 if (shouldShutDown) { 148 if (shouldShutDown) {
143 _shutDown(); 149 _shutDown();
144 } 150 }
145 } 151 }
146 152
147 void _shutDown() { 153 void _shutDown() {
148 if (_socketReceiverEventStream != null) { 154 if (_socketReceiverEventSubscription != null) {
149 ignoreFuture(_socketReceiverEventStream.close()); 155 ignoreFuture(_socketReceiverEventSubscription.close());
150 _socketReceiverEventStream = null; 156 _socketReceiverEventSubscription = null;
151 } 157 }
152 if (_socketSender != null) { 158 if (_socketSender != null) {
153 if (_socketSender.handle.isValid) 159 if (_socketSender.handle.isValid) _socketSender.handle.close();
154 _socketSender.handle.close();
155 _socketSender = null; 160 _socketSender = null;
156 } 161 }
157 if (_socketReceiver != null) { 162 if (_socketReceiver != null) {
158 if (_socketReceiver.handle.isValid) 163 if (_socketReceiver.handle.isValid) _socketReceiver.handle.close();
159 _socketReceiver.handle.close();
160 _socketReceiver = null; 164 _socketReceiver = null;
161 } 165 }
162 if (_terminal != null) { 166 if (_terminal != null) {
163 ignoreFuture(_terminal.close()); 167 ignoreFuture(_terminal.close());
164 _terminal = null; 168 _terminal = null;
165 } 169 }
166 } 170 }
167 } 171 }
168 172
169 class TerminalClientImpl implements TerminalClient { 173 class TerminalClientImpl implements TerminalClient {
170 TerminalClientStub _stub; 174 TerminalClientStub _stub;
171 Application _application; 175 Application _application;
172 String _resolvedUrl; 176 String _resolvedUrl;
173 177
174 TerminalClientImpl( 178 TerminalClientImpl(
175 this._application, this._resolvedUrl, MojoMessagePipeEndpoint endpoint) { 179 this._application, this._resolvedUrl, MojoMessagePipeEndpoint endpoint) {
176 _stub = new TerminalClientStub.fromEndpoint(endpoint, this); 180 _stub = new TerminalClientStub.fromEndpoint(endpoint, this);
177 } 181 }
178 182
179 @override 183 @override
180 void connectToTerminal(files.FileProxy terminal) { 184 void connectToTerminal(files.FileProxy terminal) {
181 var url = Uri.parse(_resolvedUrl); 185 var url = Uri.parse(_resolvedUrl);
182 NetAddress remote_address; 186 NetAddress remote_address;
183 try { 187 try {
184 remote_address = _getNetAddressFromUrl(url); 188 remote_address = _getNetAddressFromUrl(url);
185 } catch (e) { 189 } catch (e) {
186 fputs(terminal.ptr, 'HALP: Add a query: ?host=<host>&port=<port>\n' 190 fputs(
187 '(<host> must be "localhost" or n1.n2.n3.n4)\n\n' 191 terminal.ptr,
188 'Got query parameters:\n' + url.queryParameters.toString()); 192 'HALP: Add a query: ?host=<host>&port=<port>\n'
193 '(<host> must be "localhost" or n1.n2.n3.n4)\n\n'
194 'Got query parameters:\n' +
195 url.queryParameters.toString());
189 ignoreFuture(terminal.close()); 196 ignoreFuture(terminal.close());
190 return; 197 return;
191 } 198 }
192 199
193 // TODO(vtl): Currently, we only do IPv4, so this should work. 200 // TODO(vtl): Currently, we only do IPv4, so this should work.
194 fputs(terminal.ptr, 201 fputs(
195 'Connecting to: ' + remote_address.ipv4.addr.join('.') + ':' + 202 terminal.ptr,
196 remote_address.ipv4.port.toString() + '...'); 203 'Connecting to: ' +
204 remote_address.ipv4.addr.join('.') +
205 ':' +
206 remote_address.ipv4.port.toString() +
207 '...');
197 208
198 var connector = new Connector(_application, terminal); 209 var connector = new Connector(_application, terminal);
199 // TODO(vtl): Do we have to do something on error? 210 // TODO(vtl): Do we have to do something on error?
200 connector.connect(remote_address) 211 connector.connect(remote_address).catchError((e) {});
201 .catchError((e) {});
202 } 212 }
203 213
204 // Note: May throw all sorts of things. 214 // Note: May throw all sorts of things.
205 static NetAddress _getNetAddressFromUrl(Uri url) { 215 static NetAddress _getNetAddressFromUrl(Uri url) {
206 var params = url.queryParameters; 216 var params = url.queryParameters;
207 var host = params['host']; 217 var host = params['host'];
208 return makeIPv4NetAddress( 218 return makeIPv4NetAddress(
209 (host == 'localhost') ? [127, 0, 0, 1] : Uri.parseIPv4Address(host), 219 (host == 'localhost') ? [127, 0, 0, 1] : Uri.parseIPv4Address(host),
210 int.parse(params['port'])); 220 int.parse(params['port']));
211 } 221 }
212 } 222 }
213 223
214 class NetcatApplication extends Application { 224 class NetcatApplication extends Application {
215 NetcatApplication.fromHandle(MojoHandle handle) : super.fromHandle(handle); 225 NetcatApplication.fromHandle(MojoHandle handle) : super.fromHandle(handle);
216 226
217 @override 227 @override
218 void acceptConnection(String requestorUrl, String resolvedUrl, 228 void acceptConnection(String requestorUrl, String resolvedUrl,
219 ApplicationConnection connection) { 229 ApplicationConnection connection) {
220 connection.provideService(TerminalClientName, 230 connection.provideService(TerminalClientName,
221 (endpoint) => new TerminalClientImpl(this, resolvedUrl, endpoint)); 231 (endpoint) => new TerminalClientImpl(this, resolvedUrl, endpoint));
222 } 232 }
223 } 233 }
224 234
225 main(List args) { 235 main(List args) {
226 MojoHandle appHandle = new MojoHandle(args[0]); 236 MojoHandle appHandle = new MojoHandle(args[0]);
227 String url = args[1]; 237 String url = args[1];
228 new NetcatApplication.fromHandle(appHandle) 238 new NetcatApplication.fromHandle(appHandle)
229 ..onError = (() { 239 ..onError = ((Object e) {
230 MojoHandle.reportLeakedHandles(); 240 MojoHandle.reportLeakedHandles();
231 }); 241 });
232 } 242 }
OLDNEW
« no previous file with comments | « benchmarks/mojo_rtt_benchmark/lib/main.dart ('k') | mojo/dart/embedder/io/socket_patch.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698