Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/socket_macos.cc ('k') | runtime/bin/socket_win.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class ServerSocket { 5 patch class RawServerSocket {
6 /* patch */ factory ServerSocket(String bindAddress, int port, int backlog) { 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1",
7 return new _ServerSocket(bindAddress, port, backlog); 7 int port = 0,
8 int backlog = 0]) {
9 return _RawServerSocket.bind(address, port, backlog);
8 } 10 }
9 } 11 }
10 12
11 13
12 patch class Socket { 14 patch class RawSocket {
13 /* patch */ factory Socket(String host, int port) => new _Socket(host, port); 15 /* patch */ static Future<RawSocket> connect(String host, int port) {
16 return _RawSocket.connect(host, port);
17 }
14 } 18 }
15 19
16 20
17 class _SocketBase extends NativeFieldWrapperClass1 { 21 // The _NativeSocket class encapsulates an OS socket.
22 class _NativeSocket extends NativeFieldWrapperClass1 {
18 // Bit flags used when communicating between the eventhandler and 23 // Bit flags used when communicating between the eventhandler and
19 // dart code. The EVENT flags are used to indicate events of 24 // dart code. The EVENT flags are used to indicate events of
20 // interest when sending a message from dart code to the 25 // interest when sending a message from dart code to the
21 // eventhandler. When receiving a message from the eventhandler the 26 // eventhandler. When receiving a message from the eventhandler the
22 // EVENT flags indicate the events that actually happened. The 27 // EVENT flags indicate the events that actually happened. The
23 // COMMAND flags are used to send commands from dart to the 28 // COMMAND flags are used to send commands from dart to the
24 // eventhandler. COMMAND flags are never received from the 29 // eventhandler. COMMAND flags are never received from the
25 // eventhandler. Additional flags are used to communicate other 30 // eventhandler. Additional flags are used to communicate other
26 // information. 31 // information.
27 static const int _IN_EVENT = 0; 32 static const int READ_EVENT = 0;
28 static const int _OUT_EVENT = 1; 33 static const int WRITE_EVENT = 1;
29 static const int _ERROR_EVENT = 2; 34 static const int ERROR_EVENT = 2;
30 static const int _CLOSE_EVENT = 3; 35 static const int CLOSED_EVENT = 3;
31 36 static const int FIRST_EVENT = READ_EVENT;
32 static const int _CLOSE_COMMAND = 8; 37 static const int LAST_EVENT = CLOSED_EVENT;
33 static const int _SHUTDOWN_READ_COMMAND = 9; 38 static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1;
34 static const int _SHUTDOWN_WRITE_COMMAND = 10; 39
35 40 static const int CLOSE_COMMAND = 8;
36 // Flag send to the eventhandler providing additional information on 41 static const int SHUTDOWN_READ_COMMAND = 9;
37 // the type of the file descriptor. 42 static const int SHUTDOWN_WRITE_COMMAND = 10;
38 static const int _LISTENING_SOCKET = 16; 43 static const int FIRST_COMMAND = CLOSE_COMMAND;
39 static const int _PIPE = 17; 44 static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND;
40 45
41 static const int _FIRST_EVENT = _IN_EVENT; 46 // Type flag send to the eventhandler providing additional
42 static const int _LAST_EVENT = _CLOSE_EVENT; 47 // information on the type of the file descriptor.
43 48 static const int LISTENING_SOCKET = 16;
44 static const int _FIRST_COMMAND = _CLOSE_COMMAND; 49 static const int PIPE_SOCKET = 17;
45 static const int _LAST_COMMAND = _SHUTDOWN_WRITE_COMMAND; 50 static const int TYPE_NORMAL_SOCKET = 0;
46 51 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET;
47 _SocketBase () { 52 static const int TYPE_PIPE = 1 << PIPE_SOCKET;
48 _handlerMap = new List.fixedLength(_LAST_EVENT + 1); 53
49 _handlerMask = 0; 54 // Native port messages.
50 _canActivateHandlers = true; 55 static const HOST_NAME_LOOKUP = 0;
51 _closed = true; 56
57 // Socket close state
58 bool isClosed = false;
59 bool isClosedRead = false;
60 bool isClosedWrite = false;
61 Completer closeCompleter = new Completer();
62
63 // Handlers and receive port for socket events from the event handler.
64 int eventMask = 0;
65 List eventHandlers;
66 ReceivePort eventPort;
67
68 // Indicates if native interrupts can be activated.
69 bool canActivateEvents = true;
70
71 // The type flags for this socket.
72 final int typeFlags;
73
74 // Holds the port of the socket, null if not known.
75 int localPort;
76
77 // Native port for socket services.
78 static SendPort socketService;
79
80 static Future<_NativeSocket> connect(String host, int port) {
81 var completer = new Completer();
82 ensureSocketService();
83 socketService.call([HOST_NAME_LOOKUP, host]).then((response) {
84 if (isErrorResponse(response)) {
85 completer.completeError(
86 createError(response, "Failed host name lookup"));
87 } else {
88 var socket = new _NativeSocket.normal();
89 var result = socket.nativeCreateConnect(response, port);
90 if (result is OSError) {
91 completer.completeError(createError(result, "Connection failed"));
92 } else {
93 // Setup handlers for receiving the first write event which
94 // indicate that the socket is fully connected.
95 socket.setHandlers(
96 write: () {
97 socket.setListening(read: false, write: false);
98 completer.complete(socket);
99 },
100 error: (e) {
101 socket.close();
102 completer.completeError(createError(e, "Connection failed"));
103 }
104 );
105 socket.setListening(read: false, write: true);
106 }
107 }
108 });
109 return completer.future;
110 }
111
112 static Future<_NativeSocket> bind(String address,
113 int port,
114 int backlog) {
115 var socket = new _NativeSocket.listen();
116 var result = socket.nativeCreateBindListen(address, port, backlog);
117 if (result is OSError) {
118 return new Future.immediateError(
119 new SocketIOException("Failed to create server socket", result));
120 }
121 if (port != 0) socket.localPort = port;
122 return new Future.immediate(socket);
123 }
124
125 _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET {
126 eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
52 _EventHandler._start(); 127 _EventHandler._start();
53 _hashCode = _nextHashCode; 128 }
54 _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF; 129
130 _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET {
131 eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
132 _EventHandler._start();
133 }
134
135 _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
136 eventHandlers = new List.fixedLength(EVENT_COUNT + 1);
137 _EventHandler._start();
138 }
139
140 int available() {
141 if (isClosed) return 0;
142 var result = nativeAvailable();
143 if (result is OSError) {
144 reportError(result, "Available failed");
145 return 0;
146 } else {
147 return result;
148 }
149 }
150
151 List<int> read(int len) {
152 if (len != null && len <= 0) {
153 throw new ArgumentError("Illegal length $len");
154 }
155 var result = nativeRead(len == null ? -1 : len);
156 if (result is OSError) {
157 reportError(result, "Read failed");
158 return null;
159 }
160 return result;
161 }
162
163 int write(List<int> buffer, int offset, int bytes) {
164 if (buffer is! List) throw new ArgumentError();
165 if (offset == null) offset = 0;
166 if (bytes == null) bytes = buffer.length;
167 if (offset < 0) throw new RangeError.value(offset);
168 if (bytes < 0) throw new RangeError.value(bytes);
169 if ((offset + bytes) > buffer.length) {
170 throw new RangeError.value(offset + bytes);
171 }
172 if (offset is! int || bytes is! int) {
173 throw new ArgumentError("Invalid arguments to write on Socket");
174 }
175 if (isClosed) return 0;
176 if (bytes == 0) return 0;
177 _BufferAndOffset bufferAndOffset =
178 _ensureFastAndSerializableBuffer(buffer, offset, bytes);
179 var result =
180 nativeWrite(bufferAndOffset.buffer, bufferAndOffset.offset, bytes);
181 if (result is OSError) {
182 reportError(result, "Write failed");
183 result = 0;
184 }
185 return result;
186 }
187
188 _NativeSocket accept() {
189 var socket = new _NativeSocket.normal();
190 if (nativeAccept(socket) != true) return null;
191 return socket;
192 }
193
194 int get port {
195 if (localPort != null) return localPort;
196 return localPort = nativeGetPort();
197 }
198
199 int get remotePort {
200 return nativeGetRemotePeer()[1];
201 }
202
203 String get remoteHost {
204 return nativeGetRemotePeer()[0];
55 } 205 }
56 206
57 // Multiplexes socket events to the socket handlers. 207 // Multiplexes socket events to the socket handlers.
58 void _multiplex(int event_mask) { 208 void multiplex(int events) {
59 _canActivateHandlers = false; 209 canActivateEvents = false;
60 for (int i = _FIRST_EVENT; i <= _LAST_EVENT; i++) { 210 for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
61 if (((event_mask & (1 << i)) != 0)) { 211 if (((events & (1 << i)) != 0)) {
62 if ((i == _CLOSE_EVENT) && this is _Socket && !_closed) { 212 if (i == CLOSED_EVENT &&
63 _closedRead = true; 213 typeFlags != TYPE_LISTENING_SOCKET &&
64 if (_closedWrite) _close(); 214 !isClosed) {
215 isClosedRead = true;
65 } 216 }
66 217
67 var eventHandler = _handlerMap[i]; 218 var handler = eventHandlers[i];
68 if (eventHandler != null || i == _ERROR_EVENT) { 219 assert(handler != null);
220 if (i == WRITE_EVENT) {
221 // If the event was disabled before we had a chance to fire the event,
222 // discard it. If we register again, we'll get a new one.
223 if ((eventMask & (1 << i)) == 0) continue;
69 // Unregister the out handler before executing it. There is 224 // Unregister the out handler before executing it. There is
70 // no need to notify the eventhandler as handlers are 225 // no need to notify the eventhandler as handlers are
71 // disabled while the event is handled. 226 // disabled while the event is handled.
72 if (i == _OUT_EVENT) _setHandler(i, null, notifyEventhandler: false); 227 eventMask &= ~(1 << i);
73
74 // Don't call the in handler if there is no data available
75 // after all.
76 if ((i == _IN_EVENT) && (this is _Socket) && (available() == 0)) {
77 continue;
78 }
79 if (i == _ERROR_EVENT) {
80 _reportError(_getError(), "");
81 close();
82 } else {
83 eventHandler();
84 }
85 } 228 }
86 } 229
87 } 230 // Don't call the in handler if there is no data available
88 _canActivateHandlers = true; 231 // after all.
89 _activateHandlers(); 232 if (i == READ_EVENT &&
90 } 233 typeFlags != TYPE_LISTENING_SOCKET &&
91 234 available() == 0) {
92 void _setHandler(int event, 235 continue;
93 Function callback,
94 {bool notifyEventhandler: true}) {
95 if (callback == null) {
96 _handlerMask &= ~(1 << event);
97 } else {
98 _handlerMask |= (1 << event);
99 }
100 _handlerMap[event] = callback;
101 // If the socket is only for writing then close the receive port
102 // when not waiting for any events.
103 if (this is _Socket &&
104 _closedRead &&
105 _handlerMask == 0 &&
106 _handler != null) {
107 _handler.close();
108 _handler = null;
109 } else {
110 if (notifyEventhandler) _activateHandlers();
111 }
112 }
113
114 OSError _getError() native "Socket_GetError";
115
116 int _getPort() native "Socket_GetPort";
117
118 void set onError(void callback(e)) {
119 _setHandler(_ERROR_EVENT, callback);
120 }
121
122 void _activateHandlers() {
123 if (_canActivateHandlers && !_closed) {
124 if (_handlerMask == 0) {
125 if (_handler != null) {
126 _handler.close();
127 _handler = null;
128 } 236 }
129 return; 237 if (i == ERROR_EVENT) {
130 } 238 reportError(nativeGetError(), "");
131 int data = _handlerMask; 239 } else if (!isClosed) {
132 if (_isListenSocket()) { 240 handler();
133 data |= (1 << _LISTENING_SOCKET); 241 }
242 }
243 }
244 if (isClosedRead && isClosedWrite) close();
245 canActivateEvents = true;
246 activateHandlers();
247 }
248
249 void setHandlers({read: null, write: null, error: null, closed: null}) {
250 eventHandlers[READ_EVENT] = read;
251 eventHandlers[WRITE_EVENT] = write;
252 eventHandlers[ERROR_EVENT] = error;
253 eventHandlers[CLOSED_EVENT] = closed;
254 }
255
256 void setListening({read: true, write: true}) {
257 eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT);
258 if (read) eventMask |= (1 << READ_EVENT);
259 if (write) eventMask |= (1 << WRITE_EVENT);
260 activateHandlers();
261 }
262
263 Future get closeFuture => closeCompleter.future;
264
265 void activateHandlers() {
266 if (canActivateEvents && !isClosed) {
267 // If we don't listen for either read or write, disconnect as we won't
268 // get close and error events anyway.
269 if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) {
270 if (eventPort != null) disconnectFromEventHandler();
134 } else { 271 } else {
135 if (_closedRead) { data &= ~(1 << _IN_EVENT); } 272 int data = eventMask;
136 if (_closedWrite) { data &= ~(1 << _OUT_EVENT); } 273 data |= typeFlags;
137 if (_isPipe()) data |= (1 << _PIPE); 274 if (isClosedRead) data &= ~(1 << READ_EVENT);
138 } 275 if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
139 _sendToEventHandler(data); 276 sendToEventHandler(data);
140 } 277 }
141 } 278 }
142 279 }
143 int get port { 280
144 if (_port == null) { 281 void close() {
145 _port = _getPort(); 282 if (!isClosed) {
146 } 283 sendToEventHandler(1 << CLOSE_COMMAND);
147 return _port; 284 isClosed = true;
148 } 285 closeCompleter.complete(this);
149 286 }
150 void close([bool halfClose = false]) { 287 // Outside the if support closing sockets created but never
151 if (!_closed) { 288 // assigned any actual socket.
152 if (halfClose) { 289 disconnectFromEventHandler();
153 _closeWrite(); 290 }
291
292 void shutdown(SocketDirection direction) {
293 if (!isClosed) {
294 switch (direction) {
295 case SocketDirection.RECEIVE:
296 shutdownRead();
297 break;
298 case SocketDirection.SEND:
299 shutdownWrite();
300 break;
301 case SocketDirection.BOTH:
302 close();
303 break;
304 default:
305 throw new ArgumentError(direction);
306 }
307 }
308 }
309
310 void shutdownWrite() {
311 if (!isClosed) {
312 if (isClosedRead) {
313 close();
154 } else { 314 } else {
155 _close(); 315 sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
156 } 316 }
157 } else if (_handler != null) { 317 isClosedWrite = true;
158 // This is to support closing sockets created but never assigned 318 }
159 // any actual socket. 319 }
160 _handler.close(); 320
161 _handler = null; 321 void shutdownRead() {
162 } 322 if (!isClosed) {
163 } 323 if (isClosedWrite) {
164 324 close();
165 void _closeWrite() {
166 if (!_closed) {
167 if (_closedRead) {
168 _close();
169 } else { 325 } else {
170 _sendToEventHandler(1 << _SHUTDOWN_WRITE_COMMAND); 326 sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
171 } 327 }
172 _closedWrite = true; 328 isClosedRead = true;
173 } 329 }
174 } 330 }
175 331
176 void _closeRead() { 332 void sendToEventHandler(int data) {
177 if (!_closed) { 333 connectToEventHandler();
178 if (_closedWrite) { 334 assert(!isClosed);
179 _close(); 335 _EventHandler._sendData(this, eventPort, data);
180 } else { 336 }
181 _sendToEventHandler(1 << _SHUTDOWN_READ_COMMAND); 337
182 } 338 void connectToEventHandler() {
183 _closedRead = true; 339 if (eventPort == null) {
184 } 340 eventPort = new ReceivePort();
185 } 341 eventPort.receive ((var message, _) => multiplex(message));
186 342 }
187 void _close() { 343 }
188 if (!_closed) { 344
189 _sendToEventHandler(1 << _CLOSE_COMMAND); 345 void disconnectFromEventHandler() {
190 _handler.close(); 346 if (eventPort != null) {
191 _handler = null; 347 eventPort.close();
192 _closed = true; 348 eventPort = null;
193 } 349 }
194 } 350 }
195 351
196 void _sendToEventHandler(int data) { 352 static void ensureSocketService() {
197 if (_handler == null) { 353 if (socketService == null) {
198 _handler = new ReceivePort(); 354 socketService = _NativeSocket.newServicePort();
199 _handler.receive((var message, ignored) { _multiplex(message); }); 355 }
200 } 356 }
201 assert(!_closed); 357
202 _EventHandler._sendData(this, _handler, data); 358 // Check whether this is an error response from a native port call.
203 } 359 static bool isErrorResponse(response) {
204 360 return response is List && response[0] != _SUCCESS_RESPONSE;
205 bool _reportError(error, String message) { 361 }
206 void doReportError(Exception e) { 362
207 // Invoke the socket error callback if any. 363 // Create the appropriate error/exception from different returned
208 bool reported = false; 364 // error objects.
209 if (_handlerMap[_ERROR_EVENT] != null) { 365 static createError(error, String message) {
210 _handlerMap[_ERROR_EVENT](e);
211 reported = true;
212 }
213 // Propagate the error to any additional listeners.
214 reported = reported || _propagateError(e);
215 if (!reported) throw e;
216 }
217
218 // For all errors we close the socket, call the error handler and
219 // disable further calls of the error handler.
220 close();
221 if (error is OSError) { 366 if (error is OSError) {
222 doReportError(new SocketIOException(message, error)); 367 return new SocketIOException(message, error);
223 } else if (error is List) { 368 } else if (error is List) {
224 assert(_isErrorResponse(error)); 369 assert(isErrorResponse(error));
225 switch (error[0]) { 370 switch (error[0]) {
226 case _ILLEGAL_ARGUMENT_RESPONSE: 371 case _ILLEGAL_ARGUMENT_RESPONSE:
227 doReportError(new ArgumentError()); 372 return new ArgumentError();
228 break;
229 case _OSERROR_RESPONSE: 373 case _OSERROR_RESPONSE:
230 doReportError(new SocketIOException( 374 return new SocketIOException(
231 message, new OSError(error[2], error[1]))); 375 message, new OSError(error[2], error[1]));
232 break;
233 default: 376 default:
234 doReportError(new Exception("Unknown error")); 377 return new Exception("Unknown error");
235 break;
236 } 378 }
237 } else { 379 } else {
238 doReportError(new SocketIOException(message)); 380 return new SocketIOException(message);
239 } 381 }
240 } 382 }
241 383
242 int get hashCode => _hashCode; 384 void reportError(error, String message) {
243 385 var e = createError(error, message);
244 bool _propagateError(Exception e) => false; 386 // Invoke the error handler if any.
245 387 if (eventHandlers[ERROR_EVENT] != null) {
246 bool _isListenSocket(); 388 eventHandlers[ERROR_EVENT](e);
247 bool _isPipe(); 389 }
248 390 // For all errors we close the socket
249 // Is this socket closed. 391 close();
250 bool _closed; 392 }
251 393
252 // Dedicated ReceivePort for socket events. 394 nativeAvailable() native "Socket_Available";
253 ReceivePort _handler; 395 nativeRead(int len) native "Socket_Read";
254 396 nativeWrite(List<int> buffer, int offset, int bytes)
255 // Poll event to handler map. 397 native "Socket_WriteList";
256 List _handlerMap; 398 bool nativeCreateConnect(String host, int port) native "Socket_CreateConnect";
257 399 nativeCreateBindListen(String address, int port, int backlog)
258 // Indicates for which poll events the socket registered handlers.
259 int _handlerMask;
260
261 // Indicates if native interrupts can be activated.
262 bool _canActivateHandlers;
263
264 // Holds the port of the socket, null if not known.
265 int _port;
266
267 // Hash code for the socket. Currently this is just a counter.
268 int _hashCode;
269 static int _nextHashCode = 0;
270 bool _closedRead = false;
271 bool _closedWrite = false;
272 }
273
274
275 class _ServerSocket extends _SocketBase implements ServerSocket {
276 // Constructor for server socket. First a socket object is allocated
277 // in which the native socket is stored. After that _createBind
278 // is called which creates a file descriptor and binds the given address
279 // and port to the socket. Null is returned if file descriptor creation or
280 // bind failed.
281 factory _ServerSocket(String bindAddress, int port, int backlog) {
282 _ServerSocket socket = new _ServerSocket._internal();
283 var result = socket._createBindListen(bindAddress, port, backlog);
284 if (result is OSError) {
285 socket.close();
286 throw new SocketIOException("Failed to create server socket", result);
287 }
288 socket._closed = false;
289 assert(result);
290 if (port != 0) {
291 socket._port = port;
292 }
293 return socket;
294 }
295
296 _ServerSocket._internal();
297
298 _accept(Socket socket) native "ServerSocket_Accept";
299
300 _createBindListen(String bindAddress, int port, int backlog)
301 native "ServerSocket_CreateBindListen"; 400 native "ServerSocket_CreateBindListen";
302 401 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept";
303 void set onConnection(void callback(Socket connection)) { 402 int nativeGetPort() native "Socket_GetPort";
304 _clientConnectionHandler = callback; 403 List nativeGetRemotePeer() native "Socket_GetRemotePeer";
305 _setHandler(_SocketBase._IN_EVENT, 404 OSError nativeGetError() native "Socket_GetError";
306 _clientConnectionHandler != null ? _connectionHandler : null); 405
307 } 406 static SendPort newServicePort() native "Socket_NewServicePort";
308 407 }
309 void _connectionHandler() { 408
310 if (!_closed) { 409
311 _Socket socket = new _Socket._internal(); 410 class _RawServerSocket extends Stream<RawSocket>
312 var result = _accept(socket); 411 implements RawServerSocket {
313 if (result is OSError) { 412 final _NativeSocket _socket;
314 _reportError(result, "Accept failed"); 413 StreamController<RawSocket> _controller;
315 } else if (result) { 414
316 socket._closed = false; 415 static Future<_RawServerSocket> bind(String address,
317 _clientConnectionHandler(socket); 416 int port,
417 int backlog) {
418 if (port < 0 || port > 0xFFFF)
419 throw new ArgumentError("Invalid port $port");
420 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
421 return _NativeSocket.bind(address, port, backlog)
422 .then((socket) => new _RawServerSocket(socket));
423 }
424
425 _RawServerSocket(this._socket) {
426 _controller = new StreamController(
427 onSubscriptionStateChange: _onSubscriptionStateChange,
428 onPauseStateChange: _onPauseStateChange);
429 _socket.closeFuture.then((_) => _controller.close());
430 _socket.setHandlers(
431 read: () {
432 var socket = _socket.accept();
433 if (socket != null) _controller.add(new _RawSocket(socket));
434 },
435 error: (e) {
436 _controller.signalError(new AsyncError(e));
437 _controller.close();
438 }
439 );
440 }
441
442 StreamSubscription<RawSocket> listen(void onData(RawSocket event),
443 {void onError(AsyncError error),
444 void onDone(),
445 bool unsubscribeOnError}) {
446 return _controller.stream.listen(
447 onData,
448 onError: onError,
449 onDone: onDone,
450 unsubscribeOnError: unsubscribeOnError);
451 }
452
453 int get port => _socket.port;
454
455 void close() => _socket.close();
456
457 void _pause() {
458 _socket.setListening(read: false, write: false);
459 }
460
461 void _resume() {
462 _socket.setListening(read: true, write: false);
463 }
464
465 void _onSubscriptionStateChange() {
466 if (_controller.hasSubscribers) {
467 _resume();
468 } else {
469 close();
470 }
471 }
472 void _onPauseStateChange() {
473 if (_controller.isPaused) {
474 _pause();
475 } else {
476 _resume();
477 }
478 }
479 }
480
481
482 class _RawSocket extends Stream<RawSocketEvent>
483 implements RawSocket {
484 final _NativeSocket _socket;
485 StreamController<RawSocketEvent> _controller;
486 bool _readEventsEnabled = true;
487 bool _writeEventsEnabled = true;
488
489 static Future<RawSocket> connect(String host, int port) {
490 return _NativeSocket.connect(host, port)
491 .then((socket) => new _RawSocket(socket));
492 }
493
494 _RawSocket(this._socket) {
495 _controller = new StreamController(
496 onSubscriptionStateChange: _onSubscriptionStateChange,
497 onPauseStateChange: _onPauseStateChange);
498 _socket.closeFuture.then((_) => _controller.close());
499 _socket.setHandlers(
500 read: () => _controller.add(RawSocketEvent.READ),
501 write: () {
502 // The write event handler is automatically disabled by the
503 // event handler when it fires.
504 _writeEventsEnabled = false;
505 _controller.add(RawSocketEvent.WRITE);
506 },
507 closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
508 error: (e) {
509 _controller.signalError(new AsyncError(e));
510 close();
511 }
512 );
513 }
514
515 factory _RawSocket._writePipe(int fd) {
516 var native = new _NativeSocket.pipe();
517 native.isClosedRead = true;
518 if (fd != null) _getStdioHandle(native, fd);
519 return new _RawSocket(native);
520 }
521
522 factory _RawSocket._readPipe(int fd) {
523 var native = new _NativeSocket.pipe();
524 native.isClosedWrite = true;
525 if (fd != null) _getStdioHandle(native, fd);
526 return new _RawSocket(native);
527 }
528
529 StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event),
530 {void onError(AsyncError error),
531 void onDone(),
532 bool unsubscribeOnError}) {
533 return _controller.stream.listen(
534 onData,
535 onError: onError,
536 onDone: onDone,
537 unsubscribeOnError: unsubscribeOnError);
538 }
539
540 int available() => _socket.available();
541
542 List<int> read([int len]) => _socket.read(len);
543
544 int write(List<int> buffer, [int offset, int count]) =>
545 _socket.write(buffer, offset, count);
546
547 void close() => _socket.close();
548
549 void shutdown(SocketDirection direction) => _socket.shutdown(direction);
550
551 int get port => _socket.port;
552
553 int get remotePort => _socket.remotePort;
554
555 String get remoteHost => _socket.remoteHost;
556
557 bool get readEventsEnabled => _readEventsEnabled;
558 void set readEventsEnabled(bool value) {
559 if (value != _readEventsEnabled) {
560 _readEventsEnabled = value;
561 if (!_controller.isPaused) _resume();
562 }
563 }
564
565 bool get writeEventsEnabled => _writeEventsEnabled;
566 void set writeEventsEnabled(bool value) {
567 if (value != _writeEventsEnabled) {
568 _writeEventsEnabled = value;
569 if (!_controller.isPaused) _resume();
570 }
571 }
572
573 _pause() {
574 _socket.setListening(read: false, write: false);
575 }
576
577 void _resume() {
578 _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled);
579 }
580
581 void _onPauseStateChange() {
582 if (_controller.isPaused) {
583 _pause();
584 } else {
585 _resume();
586 }
587 }
588
589 void _onSubscriptionStateChange() {
590 if (_controller.hasSubscribers) {
591 _resume();
592 } else {
593 close();
594 }
595 }
596 }
597
598
599 patch class ServerSocket {
600 /* patch */ static Future<ServerSocket> bind([String address = "127.0.0.1",
601 int port = 0,
602 int backlog = 0]) {
603 return _ServerSocket.bind(address, port, backlog);
604 }
605 }
606
607 class _ServerSocket extends Stream<Socket>
608 implements ServerSocket {
609 final _socket;
610
611 static Future<_ServerSocket> bind(String address,
612 int port,
613 int backlog) {
614 return _RawServerSocket.bind(address, port, backlog)
615 .then((socket) => new _ServerSocket(socket));
616 }
617
618 _ServerSocket(this._socket);
619
620 StreamSubscription<Socket> listen(void onData(Socket event),
621 {void onError(AsyncError error),
622 void onDone(),
623 bool unsubscribeOnError}) {
624 return _socket.map((rawSocket) => new _Socket(rawSocket)).listen(
625 onData,
626 onError: onError,
627 onDone: onDone,
628 unsubscribeOnError: unsubscribeOnError);
629 }
630
631 int get port => _socket.port;
632
633 void close() => _socket.close();
634 }
635
636
637 patch class Socket {
638 /* patch */ static Future<Socket> connect(String host, int port) {
639 return RawSocket.connect(host, port).then(
640 (socket) => new _Socket(socket));
641 }
642 }
643
644
645 patch class SecureSocket {
646 /* patch */ factory SecureSocket._(RawSecureSocket rawSocket) =>
647 new _SecureSocket(rawSocket);
648 }
649
650
651 class _SocketStreamConsumer extends StreamConsumer<List<int>, Socket> {
652 StreamSubscription subscription;
653 final _Socket socket;
654 int offset;
655 List<int> buffer;
656 bool paused = false;
657
658 _SocketStreamConsumer(this.socket);
659
660 Future<Socket> consume(Stream<List<int>> stream) {
661 subscription = stream.listen(
662 (data) {
663 assert(!paused);
664 assert(buffer == null);
665 buffer = data;
666 offset = 0;
667 write();
668 },
669 onDone: () {
670 socket._consumerDone();
671 });
672 return socket._doneFuture;
673 }
674
675 void write() {
676 try {
677 if (subscription == null) return;
678 assert(buffer != null);
679 // Write as much as possible.
680 offset += socket._write(buffer, offset, buffer.length - offset);
681 if (offset < buffer.length) {
682 if (!paused) {
683 paused = true;
684 // TODO(ajohnsen): It would be nice to avoid this check.
685 // Some info: socket._write can emit an event, if it fails to write.
686 // If the user closes the socket in that event, stop() will be called
687 // before we get a change to pause.
688 if (subscription == null) return;
689 subscription.pause();
690 }
691 socket._enableWriteEvent();
318 } else { 692 } else {
319 // Temporary failure accepting the connection. Ignoring 693 buffer = null;
320 // temporary failures lets us retry when we wake up with data 694 if (paused) {
321 // on the listening socket again. 695 paused = false;
322 } 696 subscription.resume();
323 }
324 }
325
326 bool _isListenSocket() => true;
327 bool _isPipe() => false;
328
329 var _clientConnectionHandler;
330 }
331
332
333 class _Socket extends _SocketBase implements Socket {
334 static const HOST_NAME_LOOKUP = 0;
335
336 // Constructs a new socket. During the construction an asynchronous
337 // host name lookup is initiated. The returned socket is not yet
338 // connected but ready for registration of callbacks.
339 factory _Socket(String host, int port) {
340 Socket socket = new _Socket._internal();
341 _ensureSocketService();
342 List request = new List.fixedLength(2);
343 request[0] = HOST_NAME_LOOKUP;
344 request[1] = host;
345 _socketService.call(request).then((response) {
346 if (socket._isErrorResponse(response)) {
347 socket._reportError(response, "Failed host name lookup");
348 } else{
349 var result = socket._createConnect(response, port);
350 if (result is OSError) {
351 socket.close();
352 socket._reportError(result, "Connection failed");
353 } else {
354 socket._closed = false;
355 socket._activateHandlers();
356 } 697 }
357 } 698 }
358 }); 699 } catch (e) {
359 return socket; 700 socket._consumerDone(e);
360 } 701 }
361 702 }
362 _Socket._internal(); 703
363 _Socket._internalReadOnly() : _pipe = true { super._closedWrite = true; } 704 void stop() {
364 _Socket._internalWriteOnly() : _pipe = true { super._closedRead = true; } 705 if (subscription == null) return;
365 706 subscription.cancel();
366 int available() { 707 subscription = null;
367 if (!_closed) { 708 socket._disableWriteEvent();
368 var result = _available(); 709 }
369 if (result is OSError) { 710 }
370 _reportError(result, "Available failed"); 711
371 return 0; 712
713 class _Socket extends Stream<List<int>> implements Socket {
714 RawSocket _raw; // Set to null when the raw socket is closed.
715 bool _closed = false; // Set to true when the raw socket is closed.
716 StreamController _controller;
717 bool _controllerClosed = false;
718 _SocketStreamConsumer _consumer;
719 IOSink<Socket> _sink;
720 Completer _doneCompleter;
721 var _subscription;
722
723 _Socket(RawSocket this._raw) {
724 _controller = new StreamController<List<int>>(
725 onSubscriptionStateChange: _onSubscriptionStateChange,
726 onPauseStateChange: _onPauseStateChange);
727 _consumer = new _SocketStreamConsumer(this);
728 _sink = new IOSink(_consumer);
729
730 // Disable read events until there is a subscription.
731 _raw.readEventsEnabled = false;
732
733 // Disable write events until the consumer needs it for pending writes.
734 _raw.writeEventsEnabled = false;
735 }
736
737 factory _Socket._writePipe([int fd]) {
738 return new _Socket(new _RawSocket._writePipe(fd));
739 }
740
741 factory _Socket._readPipe([int fd]) {
742 return new _Socket(new _RawSocket._readPipe(fd));
743 }
744
745 _NativeSocket get _nativeSocket => _raw._socket;
746
747 StreamSubscription<List<int>> listen(void onData(List<int> event),
748 {void onError(AsyncError error),
749 void onDone(),
750 bool unsubscribeOnError}) {
751 return _controller.stream.listen(
752 onData,
753 onError: onError,
754 onDone: onDone,
755 unsubscribeOnError: unsubscribeOnError);
756 }
757
758 Future<Socket> consume(Stream<List<int>> stream) {
759 return _sink.consume(stream);
760 }
761
762 Future<Socket> addStream(Stream<List<int>> stream) {
763 return _sink.addStream(stream);
764 }
765
766 void add(List<int> data) {
767 return _sink.add(data);
768 }
769
770 void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
771 return _sink.addString(string, encoding);
772 }
773
774 close() => _sink.close();
775
776 Future<Socket> get done => _sink.done;
777
778 void destroy() {
779 // Destroy can always be called to get rid of a socket.
780 if (_raw == null) return;
781 _closeRawSocket();
782 _consumer.stop();
783 _controllerClosed = true;
784 _controller.close();
785 }
786
787 int get port => _raw.port;
788 String get remoteHost => _raw.remoteHost;
789 int get remotePort => _raw.remotePort;
790
791 // Ensure a subscription on the raw socket. Both the stream and the
792 // consumer needs a subscription as they share the error and done
793 // events from the raw socket.
794 void _ensureRawSocketSubscription() {
795 if (_subscription == null) {
796 _subscription = _raw.listen(_onData,
797 onError: _onError,
798 onDone: _onDone,
799 unsubscribeOnError: true);
800 }
801 }
802
803 _closeRawSocket() {
804 var tmp = _raw;
805 _raw = null;
806 _closed = true;
807 tmp.close();
808 }
809
810 void _onSubscriptionStateChange() {
811 if (_controller.hasSubscribers) {
812 _ensureRawSocketSubscription();
813 // Enable read events for providing data to subscription.
814 if (_raw != null) {
815 _raw.readEventsEnabled = true;
816 }
817 } else {
818 _controllerClosed = true;
819 if (_raw != null) {
820 _raw.shutdown(SocketDirection.RECEIVE);
821 }
822 }
823 }
824
825 void _onPauseStateChange() {
826 if (_raw != null) {
827 _raw.readEventsEnabled = !_controller.isPaused;
828 }
829 }
830
831 void _onData(event) {
832 switch (event) {
833 case RawSocketEvent.READ:
834 _controller.add(_raw.read());
835 break;
836 case RawSocketEvent.WRITE:
837 _consumer.write();
838 break;
839 case RawSocketEvent.READ_CLOSED:
840 _controllerClosed = true;
841 _controller.close();
842 break;
843 }
844 }
845
846 void _onDone() {
847 if (!_controllerClosed) {
848 _controllerClosed = true;
849 _controller.close();
850 }
851 _done();
852 }
853
854 void _onError(error) {
855 if (!_controllerClosed) {
856 _controllerClosed = true;
857 _controller.signalError(error);
858 _controller.close();
859 }
860 _done(error);
861 }
862
863 get _doneFuture {
864 if (_doneCompleter == null) {
865 _ensureRawSocketSubscription();
866 _doneCompleter = new Completer();
867 }
868 return _doneCompleter.future;
869 }
870
871 void _done([error]) {
872 if (_doneCompleter != null) {
873 var tmp = _doneCompleter;
874 _doneCompleter = null;
875 if (error != null) {
876 tmp.completeError(error);
372 } else { 877 } else {
373 return result; 878 tmp.complete(this);
374 } 879 }
375 } 880 }
376 throw new 881 }
377 SocketIOException("Error: available failed - invalid socket handle"); 882
378 } 883 int _write(List<int> data, int offset, int length) =>
379 884 _raw.write(data, offset, length);
380 _available() native "Socket_Available"; 885
381 886 void _enableWriteEvent() {
382 List<int> read([int len]) { 887 _raw.writeEventsEnabled = true;
383 if (len != null && len <= 0) { 888 }
384 throw new SocketIOException("Illegal length $len"); 889
385 } 890 void _disableWriteEvent() {
386 var result = _read(len == null ? -1 : len); 891 if (_raw != null) {
387 if (result is OSError) { 892 _raw.writeEventsEnabled = false;
388 _reportError(result, "Read failed"); 893 }
389 return null; 894 }
390 } 895
391 return result; 896 void _consumerDone([error]) {
392 } 897 if (_raw != null) {
393 898 _raw.shutdown(SocketDirection.SEND);
394 _read(int len) native "Socket_Read"; 899 _disableWriteEvent();
395 900 }
396 int readList(List<int> buffer, int offset, int bytes) { 901 _done(error);
397 if (!_closed) { 902 }
398 if (bytes == 0) { 903 }
399 return 0; 904
400 } 905
401 if (offset < 0) { 906 class _SecureSocket extends _Socket implements SecureSocket {
402 throw new RangeError.value(offset); 907 _SecureSocket(RawSecureSocket raw) : super(raw);
403 } 908
404 if (bytes < 0) { 909 void set onBadCertificate(bool callback(X509Certificate certificate)) {
405 throw new RangeError.value(bytes); 910 if (_raw == null) {
406 } 911 throw new StateError("onBadCertificate called on destroyed SecureSocket");
407 if ((offset + bytes) > buffer.length) { 912 }
408 throw new RangeError.value(offset + bytes); 913 _raw.onBadCertificate = callback;
409 } 914 }
410 var result = _readList(buffer, offset, bytes); 915
411 if (result is OSError) { 916 X509Certificate get peerCertificate {
412 _reportError(result, "Read failed"); 917 if (_raw == null) {
413 return -1; 918 throw new StateError("peerCertificate called on destroyed SecureSocket");
414 } 919 }
415 return result; 920 return _raw.peerCertificate;
416 } 921 }
417 throw new 922 }
418 SocketIOException("Error: readList failed - invalid socket handle");
419 }
420
421 _readList(List<int> buffer, int offset, int bytes) native "Socket_ReadList";
422
423 int writeList(List<int> buffer, int offset, int bytes) {
424 if (buffer is! List || offset is! int || bytes is! int) {
425 throw new ArgumentError(
426 "Invalid arguments to writeList on Socket");
427 }
428 if (!_closed) {
429 if (bytes == 0) {
430 return 0;
431 }
432 if (offset < 0) {
433 throw new RangeError.value(offset);
434 }
435 if (bytes < 0) {
436 throw new RangeError.value(bytes);
437 }
438 if ((offset + bytes) > buffer.length) {
439 throw new RangeError.value(offset + bytes);
440 }
441 _BufferAndOffset bufferAndOffset =
442 _ensureFastAndSerializableBuffer(buffer, offset, bytes);
443 var result =
444 _writeList(bufferAndOffset.buffer, bufferAndOffset.offset, bytes);
445 if (result is OSError) {
446 _reportError(result, "Write failed");
447 // If writing fails we return 0 as the number of bytes and
448 // report the error on the error handler.
449 result = 0;
450 }
451 return result;
452 }
453 throw new SocketIOException("writeList failed - invalid socket handle");
454 }
455
456 _writeList(List<int> buffer, int offset, int bytes) native "Socket_WriteList";
457
458 bool _isErrorResponse(response) {
459 return response is List && response[0] != _SUCCESS_RESPONSE;
460 }
461
462 bool _createConnect(String host, int port) native "Socket_CreateConnect";
463
464 void set onWrite(void callback()) {
465 if (_outputStream != null) throw new StreamException(
466 "Cannot set write handler when output stream is used");
467 _clientWriteHandler = callback;
468 _updateOutHandler();
469 }
470
471 void set onConnect(void callback()) {
472 if (_seenFirstOutEvent) {
473 throw new StreamException(
474 "Cannot set connect handler when already connected");
475 }
476 _clientConnectHandler = callback;
477 _updateOutHandler();
478 }
479
480 void set onData(void callback()) {
481 if (_inputStream != null) throw new StreamException(
482 "Cannot set data handler when input stream is used");
483 _onData = callback;
484 }
485
486 void set onClosed(void callback()) {
487 if (_inputStream != null) throw new StreamException(
488 "Cannot set close handler when input stream is used");
489 _onClosed = callback;
490 }
491
492 bool _isListenSocket() => false;
493
494 bool _isPipe() => _pipe;
495
496 InputStream get inputStream {
497 if (_inputStream == null) {
498 if (_handlerMap[_SocketBase._IN_EVENT] != null ||
499 _handlerMap[_SocketBase._CLOSE_EVENT] != null) {
500 throw new StreamException(
501 "Cannot get input stream when socket handlers are used");
502 }
503 _inputStream = new _SocketInputStream(this);
504 }
505 return _inputStream;
506 }
507
508 OutputStream get outputStream {
509 if (_outputStream == null) {
510 if (_clientWriteHandler != null) {
511 throw new StreamException(
512 "Cannot get output stream when socket handlers are used");
513 }
514 _outputStream = new _SocketOutputStream(this);
515 }
516 return _outputStream;
517 }
518
519 void set _onWrite(void callback()) {
520 _setHandler(_SocketBase._OUT_EVENT, callback);
521 }
522
523 void set _onData(void callback()) {
524 _setHandler(_SocketBase._IN_EVENT, callback);
525 }
526
527 void set _onClosed(void callback()) {
528 _setHandler(_SocketBase._CLOSE_EVENT, callback);
529 }
530
531 bool _propagateError(Exception e) {
532 bool reported = false;
533 if (_inputStream != null) {
534 reported = reported || _inputStream._onSocketError(e);
535 }
536 if (_outputStream != null) {
537 reported = reported || _outputStream._onSocketError(e);
538 }
539 return reported;
540 }
541
542 void _updateOutHandler() {
543 void firstWriteHandler() {
544 assert(!_seenFirstOutEvent);
545 _seenFirstOutEvent = true;
546
547 // From now on the write handler is only the client write
548 // handler (connect handler cannot be called again). Change this
549 // before calling any handlers as handlers can change the
550 // handlers.
551 if (_clientWriteHandler == null) _onWrite = _clientWriteHandler;
552
553 // First out event is socket connected event.
554 if (_clientConnectHandler != null) _clientConnectHandler();
555 _clientConnectHandler = null;
556
557 // Always (even for the first out event) call the write handler.
558 if (_clientWriteHandler != null) _clientWriteHandler();
559 }
560
561 if (_clientConnectHandler == null && _clientWriteHandler == null) {
562 _onWrite = null;
563 } else {
564 if (_seenFirstOutEvent) {
565 _onWrite = _clientWriteHandler;
566 } else {
567 _onWrite = firstWriteHandler;
568 }
569 }
570 }
571
572 int get remotePort {
573 if (_remotePort == null) {
574 remoteHost;
575 }
576 return _remotePort;
577 }
578
579 String get remoteHost {
580 if (_remoteHost == null) {
581 List peer = _getRemotePeer();
582 _remoteHost = peer[0];
583 _remotePort = peer[1];
584 }
585 return _remoteHost;
586 }
587
588 List _getRemotePeer() native "Socket_GetRemotePeer";
589
590 static SendPort _newServicePort() native "Socket_NewServicePort";
591
592 static void _ensureSocketService() {
593 if (_socketService == null) {
594 _socketService = _Socket._newServicePort();
595 }
596 }
597
598 bool _seenFirstOutEvent = false;
599 bool _pipe = false;
600 Function _clientConnectHandler;
601 Function _clientWriteHandler;
602 _SocketInputStream _inputStream;
603 _SocketOutputStream _outputStream;
604 String _remoteHost;
605 int _remotePort;
606 static SendPort _socketService;
607 }
OLDNEW
« no previous file with comments | « runtime/bin/socket_macos.cc ('k') | runtime/bin/socket_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698