OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 patch class ServerSocket { | 5 patch class RawServerSocket { |
6 /* patch */ factory ServerSocket(String bindAddress, int port, int backlog) { | 6 /* patch */ static Future<RawServerSocket> bind([String address = "127.0.0.1", |
7 return new _ServerSocket(bindAddress, port, backlog); | 7 int port = 0, |
| 8 int backlog = 0]) { |
| 9 return _RawServerSocket.bind(address, port, backlog); |
8 } | 10 } |
9 } | 11 } |
10 | 12 |
11 | 13 |
12 patch class Socket { | 14 patch class RawSocket { |
13 /* patch */ factory Socket(String host, int port) => new _Socket(host, port); | 15 /* patch */ static Future<RawSocket> connect(String host, int port) { |
| 16 return _RawSocket.connect(host, port); |
| 17 } |
14 } | 18 } |
15 | 19 |
16 | 20 |
17 class _SocketBase extends NativeFieldWrapperClass1 { | 21 // The _NativeSocket class encapsulates an OS socket. |
| 22 class _NativeSocket extends NativeFieldWrapperClass1 { |
18 // Bit flags used when communicating between the eventhandler and | 23 // Bit flags used when communicating between the eventhandler and |
19 // dart code. The EVENT flags are used to indicate events of | 24 // dart code. The EVENT flags are used to indicate events of |
20 // interest when sending a message from dart code to the | 25 // interest when sending a message from dart code to the |
21 // eventhandler. When receiving a message from the eventhandler the | 26 // eventhandler. When receiving a message from the eventhandler the |
22 // EVENT flags indicate the events that actually happened. The | 27 // EVENT flags indicate the events that actually happened. The |
23 // COMMAND flags are used to send commands from dart to the | 28 // COMMAND flags are used to send commands from dart to the |
24 // eventhandler. COMMAND flags are never received from the | 29 // eventhandler. COMMAND flags are never received from the |
25 // eventhandler. Additional flags are used to communicate other | 30 // eventhandler. Additional flags are used to communicate other |
26 // information. | 31 // information. |
27 static const int _IN_EVENT = 0; | 32 static const int READ_EVENT = 0; |
28 static const int _OUT_EVENT = 1; | 33 static const int WRITE_EVENT = 1; |
29 static const int _ERROR_EVENT = 2; | 34 static const int ERROR_EVENT = 2; |
30 static const int _CLOSE_EVENT = 3; | 35 static const int CLOSED_EVENT = 3; |
31 | 36 static const int FIRST_EVENT = READ_EVENT; |
32 static const int _CLOSE_COMMAND = 8; | 37 static const int LAST_EVENT = CLOSED_EVENT; |
33 static const int _SHUTDOWN_READ_COMMAND = 9; | 38 static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1; |
34 static const int _SHUTDOWN_WRITE_COMMAND = 10; | 39 |
35 | 40 static const int CLOSE_COMMAND = 8; |
36 // Flag send to the eventhandler providing additional information on | 41 static const int SHUTDOWN_READ_COMMAND = 9; |
37 // the type of the file descriptor. | 42 static const int SHUTDOWN_WRITE_COMMAND = 10; |
38 static const int _LISTENING_SOCKET = 16; | 43 static const int FIRST_COMMAND = CLOSE_COMMAND; |
39 static const int _PIPE = 17; | 44 static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND; |
40 | 45 |
41 static const int _FIRST_EVENT = _IN_EVENT; | 46 // Type flag send to the eventhandler providing additional |
42 static const int _LAST_EVENT = _CLOSE_EVENT; | 47 // information on the type of the file descriptor. |
43 | 48 static const int LISTENING_SOCKET = 16; |
44 static const int _FIRST_COMMAND = _CLOSE_COMMAND; | 49 static const int PIPE_SOCKET = 17; |
45 static const int _LAST_COMMAND = _SHUTDOWN_WRITE_COMMAND; | 50 static const int TYPE_NORMAL_SOCKET = 0; |
46 | 51 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET; |
47 _SocketBase () { | 52 static const int TYPE_PIPE = 1 << PIPE_SOCKET; |
48 _handlerMap = new List.fixedLength(_LAST_EVENT + 1); | 53 |
49 _handlerMask = 0; | 54 // Native port messages. |
50 _canActivateHandlers = true; | 55 static const HOST_NAME_LOOKUP = 0; |
51 _closed = true; | 56 |
| 57 // Socket close state |
| 58 bool isClosed = false; |
| 59 bool isClosedRead = false; |
| 60 bool isClosedWrite = false; |
| 61 Completer closeCompleter = new Completer(); |
| 62 |
| 63 // Handlers and receive port for socket events from the event handler. |
| 64 int eventMask = 0; |
| 65 List eventHandlers; |
| 66 ReceivePort eventPort; |
| 67 |
| 68 // Indicates if native interrupts can be activated. |
| 69 bool canActivateEvents = true; |
| 70 |
| 71 // The type flags for this socket. |
| 72 final int typeFlags; |
| 73 |
| 74 // Holds the port of the socket, null if not known. |
| 75 int localPort; |
| 76 |
| 77 // Native port for socket services. |
| 78 static SendPort socketService; |
| 79 |
| 80 static Future<_NativeSocket> connect(String host, int port) { |
| 81 var completer = new Completer(); |
| 82 ensureSocketService(); |
| 83 socketService.call([HOST_NAME_LOOKUP, host]).then((response) { |
| 84 if (isErrorResponse(response)) { |
| 85 completer.completeError( |
| 86 createError(response, "Failed host name lookup")); |
| 87 } else { |
| 88 var socket = new _NativeSocket.normal(); |
| 89 var result = socket.nativeCreateConnect(response, port); |
| 90 if (result is OSError) { |
| 91 completer.completeError(createError(result, "Connection failed")); |
| 92 } else { |
| 93 // Setup handlers for receiving the first write event which |
| 94 // indicate that the socket is fully connected. |
| 95 socket.setHandlers( |
| 96 write: () { |
| 97 socket.setListening(read: false, write: false); |
| 98 completer.complete(socket); |
| 99 }, |
| 100 error: (e) { |
| 101 socket.close(); |
| 102 completer.completeError(createError(e, "Connection failed")); |
| 103 } |
| 104 ); |
| 105 socket.setListening(read: false, write: true); |
| 106 } |
| 107 } |
| 108 }); |
| 109 return completer.future; |
| 110 } |
| 111 |
| 112 static Future<_NativeSocket> bind(String address, |
| 113 int port, |
| 114 int backlog) { |
| 115 var socket = new _NativeSocket.listen(); |
| 116 var result = socket.nativeCreateBindListen(address, port, backlog); |
| 117 if (result is OSError) { |
| 118 return new Future.immediateError( |
| 119 new SocketIOException("Failed to create server socket", result)); |
| 120 } |
| 121 if (port != 0) socket.localPort = port; |
| 122 return new Future.immediate(socket); |
| 123 } |
| 124 |
| 125 _NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET { |
| 126 eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
52 _EventHandler._start(); | 127 _EventHandler._start(); |
53 _hashCode = _nextHashCode; | 128 } |
54 _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF; | 129 |
| 130 _NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET { |
| 131 eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
| 132 _EventHandler._start(); |
| 133 } |
| 134 |
| 135 _NativeSocket.pipe() : typeFlags = TYPE_PIPE { |
| 136 eventHandlers = new List.fixedLength(EVENT_COUNT + 1); |
| 137 _EventHandler._start(); |
| 138 } |
| 139 |
| 140 int available() { |
| 141 if (isClosed) return 0; |
| 142 var result = nativeAvailable(); |
| 143 if (result is OSError) { |
| 144 reportError(result, "Available failed"); |
| 145 return 0; |
| 146 } else { |
| 147 return result; |
| 148 } |
| 149 } |
| 150 |
| 151 List<int> read(int len) { |
| 152 if (len != null && len <= 0) { |
| 153 throw new ArgumentError("Illegal length $len"); |
| 154 } |
| 155 var result = nativeRead(len == null ? -1 : len); |
| 156 if (result is OSError) { |
| 157 reportError(result, "Read failed"); |
| 158 return null; |
| 159 } |
| 160 return result; |
| 161 } |
| 162 |
| 163 int write(List<int> buffer, int offset, int bytes) { |
| 164 if (buffer is! List) throw new ArgumentError(); |
| 165 if (offset == null) offset = 0; |
| 166 if (bytes == null) bytes = buffer.length; |
| 167 if (offset < 0) throw new RangeError.value(offset); |
| 168 if (bytes < 0) throw new RangeError.value(bytes); |
| 169 if ((offset + bytes) > buffer.length) { |
| 170 throw new RangeError.value(offset + bytes); |
| 171 } |
| 172 if (offset is! int || bytes is! int) { |
| 173 throw new ArgumentError("Invalid arguments to write on Socket"); |
| 174 } |
| 175 if (isClosed) return 0; |
| 176 if (bytes == 0) return 0; |
| 177 _BufferAndOffset bufferAndOffset = |
| 178 _ensureFastAndSerializableBuffer(buffer, offset, bytes); |
| 179 var result = |
| 180 nativeWrite(bufferAndOffset.buffer, bufferAndOffset.offset, bytes); |
| 181 if (result is OSError) { |
| 182 reportError(result, "Write failed"); |
| 183 result = 0; |
| 184 } |
| 185 return result; |
| 186 } |
| 187 |
| 188 _NativeSocket accept() { |
| 189 var socket = new _NativeSocket.normal(); |
| 190 if (nativeAccept(socket) != true) return null; |
| 191 return socket; |
| 192 } |
| 193 |
| 194 int get port { |
| 195 if (localPort != null) return localPort; |
| 196 return localPort = nativeGetPort(); |
| 197 } |
| 198 |
| 199 int get remotePort { |
| 200 return nativeGetRemotePeer()[1]; |
| 201 } |
| 202 |
| 203 String get remoteHost { |
| 204 return nativeGetRemotePeer()[0]; |
55 } | 205 } |
56 | 206 |
57 // Multiplexes socket events to the socket handlers. | 207 // Multiplexes socket events to the socket handlers. |
58 void _multiplex(int event_mask) { | 208 void multiplex(int events) { |
59 _canActivateHandlers = false; | 209 canActivateEvents = false; |
60 for (int i = _FIRST_EVENT; i <= _LAST_EVENT; i++) { | 210 for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) { |
61 if (((event_mask & (1 << i)) != 0)) { | 211 if (((events & (1 << i)) != 0)) { |
62 if ((i == _CLOSE_EVENT) && this is _Socket && !_closed) { | 212 if (i == CLOSED_EVENT && |
63 _closedRead = true; | 213 typeFlags != TYPE_LISTENING_SOCKET && |
64 if (_closedWrite) _close(); | 214 !isClosed) { |
| 215 isClosedRead = true; |
65 } | 216 } |
66 | 217 |
67 var eventHandler = _handlerMap[i]; | 218 var handler = eventHandlers[i]; |
68 if (eventHandler != null || i == _ERROR_EVENT) { | 219 assert(handler != null); |
| 220 if (i == WRITE_EVENT) { |
| 221 // If the event was disabled before we had a chance to fire the event, |
| 222 // discard it. If we register again, we'll get a new one. |
| 223 if ((eventMask & (1 << i)) == 0) continue; |
69 // Unregister the out handler before executing it. There is | 224 // Unregister the out handler before executing it. There is |
70 // no need to notify the eventhandler as handlers are | 225 // no need to notify the eventhandler as handlers are |
71 // disabled while the event is handled. | 226 // disabled while the event is handled. |
72 if (i == _OUT_EVENT) _setHandler(i, null, notifyEventhandler: false); | 227 eventMask &= ~(1 << i); |
73 | |
74 // Don't call the in handler if there is no data available | |
75 // after all. | |
76 if ((i == _IN_EVENT) && (this is _Socket) && (available() == 0)) { | |
77 continue; | |
78 } | |
79 if (i == _ERROR_EVENT) { | |
80 _reportError(_getError(), ""); | |
81 close(); | |
82 } else { | |
83 eventHandler(); | |
84 } | |
85 } | 228 } |
86 } | 229 |
87 } | 230 // Don't call the in handler if there is no data available |
88 _canActivateHandlers = true; | 231 // after all. |
89 _activateHandlers(); | 232 if (i == READ_EVENT && |
90 } | 233 typeFlags != TYPE_LISTENING_SOCKET && |
91 | 234 available() == 0) { |
92 void _setHandler(int event, | 235 continue; |
93 Function callback, | |
94 {bool notifyEventhandler: true}) { | |
95 if (callback == null) { | |
96 _handlerMask &= ~(1 << event); | |
97 } else { | |
98 _handlerMask |= (1 << event); | |
99 } | |
100 _handlerMap[event] = callback; | |
101 // If the socket is only for writing then close the receive port | |
102 // when not waiting for any events. | |
103 if (this is _Socket && | |
104 _closedRead && | |
105 _handlerMask == 0 && | |
106 _handler != null) { | |
107 _handler.close(); | |
108 _handler = null; | |
109 } else { | |
110 if (notifyEventhandler) _activateHandlers(); | |
111 } | |
112 } | |
113 | |
114 OSError _getError() native "Socket_GetError"; | |
115 | |
116 int _getPort() native "Socket_GetPort"; | |
117 | |
118 void set onError(void callback(e)) { | |
119 _setHandler(_ERROR_EVENT, callback); | |
120 } | |
121 | |
122 void _activateHandlers() { | |
123 if (_canActivateHandlers && !_closed) { | |
124 if (_handlerMask == 0) { | |
125 if (_handler != null) { | |
126 _handler.close(); | |
127 _handler = null; | |
128 } | 236 } |
129 return; | 237 if (i == ERROR_EVENT) { |
130 } | 238 reportError(nativeGetError(), ""); |
131 int data = _handlerMask; | 239 } else if (!isClosed) { |
132 if (_isListenSocket()) { | 240 handler(); |
133 data |= (1 << _LISTENING_SOCKET); | 241 } |
| 242 } |
| 243 } |
| 244 if (isClosedRead && isClosedWrite) close(); |
| 245 canActivateEvents = true; |
| 246 activateHandlers(); |
| 247 } |
| 248 |
| 249 void setHandlers({read: null, write: null, error: null, closed: null}) { |
| 250 eventHandlers[READ_EVENT] = read; |
| 251 eventHandlers[WRITE_EVENT] = write; |
| 252 eventHandlers[ERROR_EVENT] = error; |
| 253 eventHandlers[CLOSED_EVENT] = closed; |
| 254 } |
| 255 |
| 256 void setListening({read: true, write: true}) { |
| 257 eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT); |
| 258 if (read) eventMask |= (1 << READ_EVENT); |
| 259 if (write) eventMask |= (1 << WRITE_EVENT); |
| 260 activateHandlers(); |
| 261 } |
| 262 |
| 263 Future get closeFuture => closeCompleter.future; |
| 264 |
| 265 void activateHandlers() { |
| 266 if (canActivateEvents && !isClosed) { |
| 267 // If we don't listen for either read or write, disconnect as we won't |
| 268 // get close and error events anyway. |
| 269 if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) { |
| 270 if (eventPort != null) disconnectFromEventHandler(); |
134 } else { | 271 } else { |
135 if (_closedRead) { data &= ~(1 << _IN_EVENT); } | 272 int data = eventMask; |
136 if (_closedWrite) { data &= ~(1 << _OUT_EVENT); } | 273 data |= typeFlags; |
137 if (_isPipe()) data |= (1 << _PIPE); | 274 if (isClosedRead) data &= ~(1 << READ_EVENT); |
138 } | 275 if (isClosedWrite) data &= ~(1 << WRITE_EVENT); |
139 _sendToEventHandler(data); | 276 sendToEventHandler(data); |
140 } | 277 } |
141 } | 278 } |
142 | 279 } |
143 int get port { | 280 |
144 if (_port == null) { | 281 void close() { |
145 _port = _getPort(); | 282 if (!isClosed) { |
146 } | 283 sendToEventHandler(1 << CLOSE_COMMAND); |
147 return _port; | 284 isClosed = true; |
148 } | 285 closeCompleter.complete(this); |
149 | 286 } |
150 void close([bool halfClose = false]) { | 287 // Outside the if support closing sockets created but never |
151 if (!_closed) { | 288 // assigned any actual socket. |
152 if (halfClose) { | 289 disconnectFromEventHandler(); |
153 _closeWrite(); | 290 } |
| 291 |
| 292 void shutdown(SocketDirection direction) { |
| 293 if (!isClosed) { |
| 294 switch (direction) { |
| 295 case SocketDirection.RECEIVE: |
| 296 shutdownRead(); |
| 297 break; |
| 298 case SocketDirection.SEND: |
| 299 shutdownWrite(); |
| 300 break; |
| 301 case SocketDirection.BOTH: |
| 302 close(); |
| 303 break; |
| 304 default: |
| 305 throw new ArgumentError(direction); |
| 306 } |
| 307 } |
| 308 } |
| 309 |
| 310 void shutdownWrite() { |
| 311 if (!isClosed) { |
| 312 if (isClosedRead) { |
| 313 close(); |
154 } else { | 314 } else { |
155 _close(); | 315 sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND); |
156 } | 316 } |
157 } else if (_handler != null) { | 317 isClosedWrite = true; |
158 // This is to support closing sockets created but never assigned | 318 } |
159 // any actual socket. | 319 } |
160 _handler.close(); | 320 |
161 _handler = null; | 321 void shutdownRead() { |
162 } | 322 if (!isClosed) { |
163 } | 323 if (isClosedWrite) { |
164 | 324 close(); |
165 void _closeWrite() { | |
166 if (!_closed) { | |
167 if (_closedRead) { | |
168 _close(); | |
169 } else { | 325 } else { |
170 _sendToEventHandler(1 << _SHUTDOWN_WRITE_COMMAND); | 326 sendToEventHandler(1 << SHUTDOWN_READ_COMMAND); |
171 } | 327 } |
172 _closedWrite = true; | 328 isClosedRead = true; |
173 } | 329 } |
174 } | 330 } |
175 | 331 |
176 void _closeRead() { | 332 void sendToEventHandler(int data) { |
177 if (!_closed) { | 333 connectToEventHandler(); |
178 if (_closedWrite) { | 334 assert(!isClosed); |
179 _close(); | 335 _EventHandler._sendData(this, eventPort, data); |
180 } else { | 336 } |
181 _sendToEventHandler(1 << _SHUTDOWN_READ_COMMAND); | 337 |
182 } | 338 void connectToEventHandler() { |
183 _closedRead = true; | 339 if (eventPort == null) { |
184 } | 340 eventPort = new ReceivePort(); |
185 } | 341 eventPort.receive ((var message, _) => multiplex(message)); |
186 | 342 } |
187 void _close() { | 343 } |
188 if (!_closed) { | 344 |
189 _sendToEventHandler(1 << _CLOSE_COMMAND); | 345 void disconnectFromEventHandler() { |
190 _handler.close(); | 346 if (eventPort != null) { |
191 _handler = null; | 347 eventPort.close(); |
192 _closed = true; | 348 eventPort = null; |
193 } | 349 } |
194 } | 350 } |
195 | 351 |
196 void _sendToEventHandler(int data) { | 352 static void ensureSocketService() { |
197 if (_handler == null) { | 353 if (socketService == null) { |
198 _handler = new ReceivePort(); | 354 socketService = _NativeSocket.newServicePort(); |
199 _handler.receive((var message, ignored) { _multiplex(message); }); | 355 } |
200 } | 356 } |
201 assert(!_closed); | 357 |
202 _EventHandler._sendData(this, _handler, data); | 358 // Check whether this is an error response from a native port call. |
203 } | 359 static bool isErrorResponse(response) { |
204 | 360 return response is List && response[0] != _SUCCESS_RESPONSE; |
205 bool _reportError(error, String message) { | 361 } |
206 void doReportError(Exception e) { | 362 |
207 // Invoke the socket error callback if any. | 363 // Create the appropriate error/exception from different returned |
208 bool reported = false; | 364 // error objects. |
209 if (_handlerMap[_ERROR_EVENT] != null) { | 365 static createError(error, String message) { |
210 _handlerMap[_ERROR_EVENT](e); | |
211 reported = true; | |
212 } | |
213 // Propagate the error to any additional listeners. | |
214 reported = reported || _propagateError(e); | |
215 if (!reported) throw e; | |
216 } | |
217 | |
218 // For all errors we close the socket, call the error handler and | |
219 // disable further calls of the error handler. | |
220 close(); | |
221 if (error is OSError) { | 366 if (error is OSError) { |
222 doReportError(new SocketIOException(message, error)); | 367 return new SocketIOException(message, error); |
223 } else if (error is List) { | 368 } else if (error is List) { |
224 assert(_isErrorResponse(error)); | 369 assert(isErrorResponse(error)); |
225 switch (error[0]) { | 370 switch (error[0]) { |
226 case _ILLEGAL_ARGUMENT_RESPONSE: | 371 case _ILLEGAL_ARGUMENT_RESPONSE: |
227 doReportError(new ArgumentError()); | 372 return new ArgumentError(); |
228 break; | |
229 case _OSERROR_RESPONSE: | 373 case _OSERROR_RESPONSE: |
230 doReportError(new SocketIOException( | 374 return new SocketIOException( |
231 message, new OSError(error[2], error[1]))); | 375 message, new OSError(error[2], error[1])); |
232 break; | |
233 default: | 376 default: |
234 doReportError(new Exception("Unknown error")); | 377 return new Exception("Unknown error"); |
235 break; | |
236 } | 378 } |
237 } else { | 379 } else { |
238 doReportError(new SocketIOException(message)); | 380 return new SocketIOException(message); |
239 } | 381 } |
240 } | 382 } |
241 | 383 |
242 int get hashCode => _hashCode; | 384 void reportError(error, String message) { |
243 | 385 var e = createError(error, message); |
244 bool _propagateError(Exception e) => false; | 386 // Invoke the error handler if any. |
245 | 387 if (eventHandlers[ERROR_EVENT] != null) { |
246 bool _isListenSocket(); | 388 eventHandlers[ERROR_EVENT](e); |
247 bool _isPipe(); | 389 } |
248 | 390 // For all errors we close the socket |
249 // Is this socket closed. | 391 close(); |
250 bool _closed; | 392 } |
251 | 393 |
252 // Dedicated ReceivePort for socket events. | 394 nativeAvailable() native "Socket_Available"; |
253 ReceivePort _handler; | 395 nativeRead(int len) native "Socket_Read"; |
254 | 396 nativeWrite(List<int> buffer, int offset, int bytes) |
255 // Poll event to handler map. | 397 native "Socket_WriteList"; |
256 List _handlerMap; | 398 bool nativeCreateConnect(String host, int port) native "Socket_CreateConnect"; |
257 | 399 nativeCreateBindListen(String address, int port, int backlog) |
258 // Indicates for which poll events the socket registered handlers. | |
259 int _handlerMask; | |
260 | |
261 // Indicates if native interrupts can be activated. | |
262 bool _canActivateHandlers; | |
263 | |
264 // Holds the port of the socket, null if not known. | |
265 int _port; | |
266 | |
267 // Hash code for the socket. Currently this is just a counter. | |
268 int _hashCode; | |
269 static int _nextHashCode = 0; | |
270 bool _closedRead = false; | |
271 bool _closedWrite = false; | |
272 } | |
273 | |
274 | |
275 class _ServerSocket extends _SocketBase implements ServerSocket { | |
276 // Constructor for server socket. First a socket object is allocated | |
277 // in which the native socket is stored. After that _createBind | |
278 // is called which creates a file descriptor and binds the given address | |
279 // and port to the socket. Null is returned if file descriptor creation or | |
280 // bind failed. | |
281 factory _ServerSocket(String bindAddress, int port, int backlog) { | |
282 _ServerSocket socket = new _ServerSocket._internal(); | |
283 var result = socket._createBindListen(bindAddress, port, backlog); | |
284 if (result is OSError) { | |
285 socket.close(); | |
286 throw new SocketIOException("Failed to create server socket", result); | |
287 } | |
288 socket._closed = false; | |
289 assert(result); | |
290 if (port != 0) { | |
291 socket._port = port; | |
292 } | |
293 return socket; | |
294 } | |
295 | |
296 _ServerSocket._internal(); | |
297 | |
298 _accept(Socket socket) native "ServerSocket_Accept"; | |
299 | |
300 _createBindListen(String bindAddress, int port, int backlog) | |
301 native "ServerSocket_CreateBindListen"; | 400 native "ServerSocket_CreateBindListen"; |
302 | 401 nativeAccept(_NativeSocket socket) native "ServerSocket_Accept"; |
303 void set onConnection(void callback(Socket connection)) { | 402 int nativeGetPort() native "Socket_GetPort"; |
304 _clientConnectionHandler = callback; | 403 List nativeGetRemotePeer() native "Socket_GetRemotePeer"; |
305 _setHandler(_SocketBase._IN_EVENT, | 404 OSError nativeGetError() native "Socket_GetError"; |
306 _clientConnectionHandler != null ? _connectionHandler : null); | 405 |
307 } | 406 static SendPort newServicePort() native "Socket_NewServicePort"; |
308 | 407 } |
309 void _connectionHandler() { | 408 |
310 if (!_closed) { | 409 |
311 _Socket socket = new _Socket._internal(); | 410 class _RawServerSocket extends Stream<RawSocket> |
312 var result = _accept(socket); | 411 implements RawServerSocket { |
313 if (result is OSError) { | 412 final _NativeSocket _socket; |
314 _reportError(result, "Accept failed"); | 413 StreamController<RawSocket> _controller; |
315 } else if (result) { | 414 |
316 socket._closed = false; | 415 static Future<_RawServerSocket> bind(String address, |
317 _clientConnectionHandler(socket); | 416 int port, |
| 417 int backlog) { |
| 418 if (port < 0 || port > 0xFFFF) |
| 419 throw new ArgumentError("Invalid port $port"); |
| 420 if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog"); |
| 421 return _NativeSocket.bind(address, port, backlog) |
| 422 .then((socket) => new _RawServerSocket(socket)); |
| 423 } |
| 424 |
| 425 _RawServerSocket(this._socket) { |
| 426 _controller = new StreamController( |
| 427 onSubscriptionStateChange: _onSubscriptionStateChange, |
| 428 onPauseStateChange: _onPauseStateChange); |
| 429 _socket.closeFuture.then((_) => _controller.close()); |
| 430 _socket.setHandlers( |
| 431 read: () { |
| 432 var socket = _socket.accept(); |
| 433 if (socket != null) _controller.add(new _RawSocket(socket)); |
| 434 }, |
| 435 error: (e) { |
| 436 _controller.signalError(new AsyncError(e)); |
| 437 _controller.close(); |
| 438 } |
| 439 ); |
| 440 } |
| 441 |
| 442 StreamSubscription<RawSocket> listen(void onData(RawSocket event), |
| 443 {void onError(AsyncError error), |
| 444 void onDone(), |
| 445 bool unsubscribeOnError}) { |
| 446 return _controller.stream.listen( |
| 447 onData, |
| 448 onError: onError, |
| 449 onDone: onDone, |
| 450 unsubscribeOnError: unsubscribeOnError); |
| 451 } |
| 452 |
| 453 int get port => _socket.port; |
| 454 |
| 455 void close() => _socket.close(); |
| 456 |
| 457 void _pause() { |
| 458 _socket.setListening(read: false, write: false); |
| 459 } |
| 460 |
| 461 void _resume() { |
| 462 _socket.setListening(read: true, write: false); |
| 463 } |
| 464 |
| 465 void _onSubscriptionStateChange() { |
| 466 if (_controller.hasSubscribers) { |
| 467 _resume(); |
| 468 } else { |
| 469 close(); |
| 470 } |
| 471 } |
| 472 void _onPauseStateChange() { |
| 473 if (_controller.isPaused) { |
| 474 _pause(); |
| 475 } else { |
| 476 _resume(); |
| 477 } |
| 478 } |
| 479 } |
| 480 |
| 481 |
| 482 class _RawSocket extends Stream<RawSocketEvent> |
| 483 implements RawSocket { |
| 484 final _NativeSocket _socket; |
| 485 StreamController<RawSocketEvent> _controller; |
| 486 bool _readEventsEnabled = true; |
| 487 bool _writeEventsEnabled = true; |
| 488 |
| 489 static Future<RawSocket> connect(String host, int port) { |
| 490 return _NativeSocket.connect(host, port) |
| 491 .then((socket) => new _RawSocket(socket)); |
| 492 } |
| 493 |
| 494 _RawSocket(this._socket) { |
| 495 _controller = new StreamController( |
| 496 onSubscriptionStateChange: _onSubscriptionStateChange, |
| 497 onPauseStateChange: _onPauseStateChange); |
| 498 _socket.closeFuture.then((_) => _controller.close()); |
| 499 _socket.setHandlers( |
| 500 read: () => _controller.add(RawSocketEvent.READ), |
| 501 write: () { |
| 502 // The write event handler is automatically disabled by the |
| 503 // event handler when it fires. |
| 504 _writeEventsEnabled = false; |
| 505 _controller.add(RawSocketEvent.WRITE); |
| 506 }, |
| 507 closed: () => _controller.add(RawSocketEvent.READ_CLOSED), |
| 508 error: (e) { |
| 509 _controller.signalError(new AsyncError(e)); |
| 510 close(); |
| 511 } |
| 512 ); |
| 513 } |
| 514 |
| 515 factory _RawSocket._writePipe(int fd) { |
| 516 var native = new _NativeSocket.pipe(); |
| 517 native.isClosedRead = true; |
| 518 if (fd != null) _getStdioHandle(native, fd); |
| 519 return new _RawSocket(native); |
| 520 } |
| 521 |
| 522 factory _RawSocket._readPipe(int fd) { |
| 523 var native = new _NativeSocket.pipe(); |
| 524 native.isClosedWrite = true; |
| 525 if (fd != null) _getStdioHandle(native, fd); |
| 526 return new _RawSocket(native); |
| 527 } |
| 528 |
| 529 StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event), |
| 530 {void onError(AsyncError error), |
| 531 void onDone(), |
| 532 bool unsubscribeOnError}) { |
| 533 return _controller.stream.listen( |
| 534 onData, |
| 535 onError: onError, |
| 536 onDone: onDone, |
| 537 unsubscribeOnError: unsubscribeOnError); |
| 538 } |
| 539 |
| 540 int available() => _socket.available(); |
| 541 |
| 542 List<int> read([int len]) => _socket.read(len); |
| 543 |
| 544 int write(List<int> buffer, [int offset, int count]) => |
| 545 _socket.write(buffer, offset, count); |
| 546 |
| 547 void close() => _socket.close(); |
| 548 |
| 549 void shutdown(SocketDirection direction) => _socket.shutdown(direction); |
| 550 |
| 551 int get port => _socket.port; |
| 552 |
| 553 int get remotePort => _socket.remotePort; |
| 554 |
| 555 String get remoteHost => _socket.remoteHost; |
| 556 |
| 557 bool get readEventsEnabled => _readEventsEnabled; |
| 558 void set readEventsEnabled(bool value) { |
| 559 if (value != _readEventsEnabled) { |
| 560 _readEventsEnabled = value; |
| 561 if (!_controller.isPaused) _resume(); |
| 562 } |
| 563 } |
| 564 |
| 565 bool get writeEventsEnabled => _writeEventsEnabled; |
| 566 void set writeEventsEnabled(bool value) { |
| 567 if (value != _writeEventsEnabled) { |
| 568 _writeEventsEnabled = value; |
| 569 if (!_controller.isPaused) _resume(); |
| 570 } |
| 571 } |
| 572 |
| 573 _pause() { |
| 574 _socket.setListening(read: false, write: false); |
| 575 } |
| 576 |
| 577 void _resume() { |
| 578 _socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled); |
| 579 } |
| 580 |
| 581 void _onPauseStateChange() { |
| 582 if (_controller.isPaused) { |
| 583 _pause(); |
| 584 } else { |
| 585 _resume(); |
| 586 } |
| 587 } |
| 588 |
| 589 void _onSubscriptionStateChange() { |
| 590 if (_controller.hasSubscribers) { |
| 591 _resume(); |
| 592 } else { |
| 593 close(); |
| 594 } |
| 595 } |
| 596 } |
| 597 |
| 598 |
| 599 patch class ServerSocket { |
| 600 /* patch */ static Future<ServerSocket> bind([String address = "127.0.0.1", |
| 601 int port = 0, |
| 602 int backlog = 0]) { |
| 603 return _ServerSocket.bind(address, port, backlog); |
| 604 } |
| 605 } |
| 606 |
| 607 class _ServerSocket extends Stream<Socket> |
| 608 implements ServerSocket { |
| 609 final _socket; |
| 610 |
| 611 static Future<_ServerSocket> bind(String address, |
| 612 int port, |
| 613 int backlog) { |
| 614 return _RawServerSocket.bind(address, port, backlog) |
| 615 .then((socket) => new _ServerSocket(socket)); |
| 616 } |
| 617 |
| 618 _ServerSocket(this._socket); |
| 619 |
| 620 StreamSubscription<Socket> listen(void onData(Socket event), |
| 621 {void onError(AsyncError error), |
| 622 void onDone(), |
| 623 bool unsubscribeOnError}) { |
| 624 return _socket.map((rawSocket) => new _Socket(rawSocket)).listen( |
| 625 onData, |
| 626 onError: onError, |
| 627 onDone: onDone, |
| 628 unsubscribeOnError: unsubscribeOnError); |
| 629 } |
| 630 |
| 631 int get port => _socket.port; |
| 632 |
| 633 void close() => _socket.close(); |
| 634 } |
| 635 |
| 636 |
| 637 patch class Socket { |
| 638 /* patch */ static Future<Socket> connect(String host, int port) { |
| 639 return RawSocket.connect(host, port).then( |
| 640 (socket) => new _Socket(socket)); |
| 641 } |
| 642 } |
| 643 |
| 644 |
| 645 patch class SecureSocket { |
| 646 /* patch */ factory SecureSocket._(RawSecureSocket rawSocket) => |
| 647 new _SecureSocket(rawSocket); |
| 648 } |
| 649 |
| 650 |
| 651 class _SocketStreamConsumer extends StreamConsumer<List<int>, Socket> { |
| 652 StreamSubscription subscription; |
| 653 final _Socket socket; |
| 654 int offset; |
| 655 List<int> buffer; |
| 656 bool paused = false; |
| 657 |
| 658 _SocketStreamConsumer(this.socket); |
| 659 |
| 660 Future<Socket> consume(Stream<List<int>> stream) { |
| 661 subscription = stream.listen( |
| 662 (data) { |
| 663 assert(!paused); |
| 664 assert(buffer == null); |
| 665 buffer = data; |
| 666 offset = 0; |
| 667 write(); |
| 668 }, |
| 669 onDone: () { |
| 670 socket._consumerDone(); |
| 671 }); |
| 672 return socket._doneFuture; |
| 673 } |
| 674 |
| 675 void write() { |
| 676 try { |
| 677 if (subscription == null) return; |
| 678 assert(buffer != null); |
| 679 // Write as much as possible. |
| 680 offset += socket._write(buffer, offset, buffer.length - offset); |
| 681 if (offset < buffer.length) { |
| 682 if (!paused) { |
| 683 paused = true; |
| 684 // TODO(ajohnsen): It would be nice to avoid this check. |
| 685 // Some info: socket._write can emit an event, if it fails to write. |
| 686 // If the user closes the socket in that event, stop() will be called |
| 687 // before we get a change to pause. |
| 688 if (subscription == null) return; |
| 689 subscription.pause(); |
| 690 } |
| 691 socket._enableWriteEvent(); |
318 } else { | 692 } else { |
319 // Temporary failure accepting the connection. Ignoring | 693 buffer = null; |
320 // temporary failures lets us retry when we wake up with data | 694 if (paused) { |
321 // on the listening socket again. | 695 paused = false; |
322 } | 696 subscription.resume(); |
323 } | |
324 } | |
325 | |
326 bool _isListenSocket() => true; | |
327 bool _isPipe() => false; | |
328 | |
329 var _clientConnectionHandler; | |
330 } | |
331 | |
332 | |
333 class _Socket extends _SocketBase implements Socket { | |
334 static const HOST_NAME_LOOKUP = 0; | |
335 | |
336 // Constructs a new socket. During the construction an asynchronous | |
337 // host name lookup is initiated. The returned socket is not yet | |
338 // connected but ready for registration of callbacks. | |
339 factory _Socket(String host, int port) { | |
340 Socket socket = new _Socket._internal(); | |
341 _ensureSocketService(); | |
342 List request = new List.fixedLength(2); | |
343 request[0] = HOST_NAME_LOOKUP; | |
344 request[1] = host; | |
345 _socketService.call(request).then((response) { | |
346 if (socket._isErrorResponse(response)) { | |
347 socket._reportError(response, "Failed host name lookup"); | |
348 } else{ | |
349 var result = socket._createConnect(response, port); | |
350 if (result is OSError) { | |
351 socket.close(); | |
352 socket._reportError(result, "Connection failed"); | |
353 } else { | |
354 socket._closed = false; | |
355 socket._activateHandlers(); | |
356 } | 697 } |
357 } | 698 } |
358 }); | 699 } catch (e) { |
359 return socket; | 700 socket._consumerDone(e); |
360 } | 701 } |
361 | 702 } |
362 _Socket._internal(); | 703 |
363 _Socket._internalReadOnly() : _pipe = true { super._closedWrite = true; } | 704 void stop() { |
364 _Socket._internalWriteOnly() : _pipe = true { super._closedRead = true; } | 705 if (subscription == null) return; |
365 | 706 subscription.cancel(); |
366 int available() { | 707 subscription = null; |
367 if (!_closed) { | 708 socket._disableWriteEvent(); |
368 var result = _available(); | 709 } |
369 if (result is OSError) { | 710 } |
370 _reportError(result, "Available failed"); | 711 |
371 return 0; | 712 |
| 713 class _Socket extends Stream<List<int>> implements Socket { |
| 714 RawSocket _raw; // Set to null when the raw socket is closed. |
| 715 bool _closed = false; // Set to true when the raw socket is closed. |
| 716 StreamController _controller; |
| 717 bool _controllerClosed = false; |
| 718 _SocketStreamConsumer _consumer; |
| 719 IOSink<Socket> _sink; |
| 720 Completer _doneCompleter; |
| 721 var _subscription; |
| 722 |
| 723 _Socket(RawSocket this._raw) { |
| 724 _controller = new StreamController<List<int>>( |
| 725 onSubscriptionStateChange: _onSubscriptionStateChange, |
| 726 onPauseStateChange: _onPauseStateChange); |
| 727 _consumer = new _SocketStreamConsumer(this); |
| 728 _sink = new IOSink(_consumer); |
| 729 |
| 730 // Disable read events until there is a subscription. |
| 731 _raw.readEventsEnabled = false; |
| 732 |
| 733 // Disable write events until the consumer needs it for pending writes. |
| 734 _raw.writeEventsEnabled = false; |
| 735 } |
| 736 |
| 737 factory _Socket._writePipe([int fd]) { |
| 738 return new _Socket(new _RawSocket._writePipe(fd)); |
| 739 } |
| 740 |
| 741 factory _Socket._readPipe([int fd]) { |
| 742 return new _Socket(new _RawSocket._readPipe(fd)); |
| 743 } |
| 744 |
| 745 _NativeSocket get _nativeSocket => _raw._socket; |
| 746 |
| 747 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 748 {void onError(AsyncError error), |
| 749 void onDone(), |
| 750 bool unsubscribeOnError}) { |
| 751 return _controller.stream.listen( |
| 752 onData, |
| 753 onError: onError, |
| 754 onDone: onDone, |
| 755 unsubscribeOnError: unsubscribeOnError); |
| 756 } |
| 757 |
| 758 Future<Socket> consume(Stream<List<int>> stream) { |
| 759 return _sink.consume(stream); |
| 760 } |
| 761 |
| 762 Future<Socket> addStream(Stream<List<int>> stream) { |
| 763 return _sink.addStream(stream); |
| 764 } |
| 765 |
| 766 void add(List<int> data) { |
| 767 return _sink.add(data); |
| 768 } |
| 769 |
| 770 void addString(String string, [Encoding encoding = Encoding.UTF_8]) { |
| 771 return _sink.addString(string, encoding); |
| 772 } |
| 773 |
| 774 close() => _sink.close(); |
| 775 |
| 776 Future<Socket> get done => _sink.done; |
| 777 |
| 778 void destroy() { |
| 779 // Destroy can always be called to get rid of a socket. |
| 780 if (_raw == null) return; |
| 781 _closeRawSocket(); |
| 782 _consumer.stop(); |
| 783 _controllerClosed = true; |
| 784 _controller.close(); |
| 785 } |
| 786 |
| 787 int get port => _raw.port; |
| 788 String get remoteHost => _raw.remoteHost; |
| 789 int get remotePort => _raw.remotePort; |
| 790 |
| 791 // Ensure a subscription on the raw socket. Both the stream and the |
| 792 // consumer needs a subscription as they share the error and done |
| 793 // events from the raw socket. |
| 794 void _ensureRawSocketSubscription() { |
| 795 if (_subscription == null) { |
| 796 _subscription = _raw.listen(_onData, |
| 797 onError: _onError, |
| 798 onDone: _onDone, |
| 799 unsubscribeOnError: true); |
| 800 } |
| 801 } |
| 802 |
| 803 _closeRawSocket() { |
| 804 var tmp = _raw; |
| 805 _raw = null; |
| 806 _closed = true; |
| 807 tmp.close(); |
| 808 } |
| 809 |
| 810 void _onSubscriptionStateChange() { |
| 811 if (_controller.hasSubscribers) { |
| 812 _ensureRawSocketSubscription(); |
| 813 // Enable read events for providing data to subscription. |
| 814 if (_raw != null) { |
| 815 _raw.readEventsEnabled = true; |
| 816 } |
| 817 } else { |
| 818 _controllerClosed = true; |
| 819 if (_raw != null) { |
| 820 _raw.shutdown(SocketDirection.RECEIVE); |
| 821 } |
| 822 } |
| 823 } |
| 824 |
| 825 void _onPauseStateChange() { |
| 826 if (_raw != null) { |
| 827 _raw.readEventsEnabled = !_controller.isPaused; |
| 828 } |
| 829 } |
| 830 |
| 831 void _onData(event) { |
| 832 switch (event) { |
| 833 case RawSocketEvent.READ: |
| 834 _controller.add(_raw.read()); |
| 835 break; |
| 836 case RawSocketEvent.WRITE: |
| 837 _consumer.write(); |
| 838 break; |
| 839 case RawSocketEvent.READ_CLOSED: |
| 840 _controllerClosed = true; |
| 841 _controller.close(); |
| 842 break; |
| 843 } |
| 844 } |
| 845 |
| 846 void _onDone() { |
| 847 if (!_controllerClosed) { |
| 848 _controllerClosed = true; |
| 849 _controller.close(); |
| 850 } |
| 851 _done(); |
| 852 } |
| 853 |
| 854 void _onError(error) { |
| 855 if (!_controllerClosed) { |
| 856 _controllerClosed = true; |
| 857 _controller.signalError(error); |
| 858 _controller.close(); |
| 859 } |
| 860 _done(error); |
| 861 } |
| 862 |
| 863 get _doneFuture { |
| 864 if (_doneCompleter == null) { |
| 865 _ensureRawSocketSubscription(); |
| 866 _doneCompleter = new Completer(); |
| 867 } |
| 868 return _doneCompleter.future; |
| 869 } |
| 870 |
| 871 void _done([error]) { |
| 872 if (_doneCompleter != null) { |
| 873 var tmp = _doneCompleter; |
| 874 _doneCompleter = null; |
| 875 if (error != null) { |
| 876 tmp.completeError(error); |
372 } else { | 877 } else { |
373 return result; | 878 tmp.complete(this); |
374 } | 879 } |
375 } | 880 } |
376 throw new | 881 } |
377 SocketIOException("Error: available failed - invalid socket handle"); | 882 |
378 } | 883 int _write(List<int> data, int offset, int length) => |
379 | 884 _raw.write(data, offset, length); |
380 _available() native "Socket_Available"; | 885 |
381 | 886 void _enableWriteEvent() { |
382 List<int> read([int len]) { | 887 _raw.writeEventsEnabled = true; |
383 if (len != null && len <= 0) { | 888 } |
384 throw new SocketIOException("Illegal length $len"); | 889 |
385 } | 890 void _disableWriteEvent() { |
386 var result = _read(len == null ? -1 : len); | 891 if (_raw != null) { |
387 if (result is OSError) { | 892 _raw.writeEventsEnabled = false; |
388 _reportError(result, "Read failed"); | 893 } |
389 return null; | 894 } |
390 } | 895 |
391 return result; | 896 void _consumerDone([error]) { |
392 } | 897 if (_raw != null) { |
393 | 898 _raw.shutdown(SocketDirection.SEND); |
394 _read(int len) native "Socket_Read"; | 899 _disableWriteEvent(); |
395 | 900 } |
396 int readList(List<int> buffer, int offset, int bytes) { | 901 _done(error); |
397 if (!_closed) { | 902 } |
398 if (bytes == 0) { | 903 } |
399 return 0; | 904 |
400 } | 905 |
401 if (offset < 0) { | 906 class _SecureSocket extends _Socket implements SecureSocket { |
402 throw new RangeError.value(offset); | 907 _SecureSocket(RawSecureSocket raw) : super(raw); |
403 } | 908 |
404 if (bytes < 0) { | 909 void set onBadCertificate(bool callback(X509Certificate certificate)) { |
405 throw new RangeError.value(bytes); | 910 if (_raw == null) { |
406 } | 911 throw new StateError("onBadCertificate called on destroyed SecureSocket"); |
407 if ((offset + bytes) > buffer.length) { | 912 } |
408 throw new RangeError.value(offset + bytes); | 913 _raw.onBadCertificate = callback; |
409 } | 914 } |
410 var result = _readList(buffer, offset, bytes); | 915 |
411 if (result is OSError) { | 916 X509Certificate get peerCertificate { |
412 _reportError(result, "Read failed"); | 917 if (_raw == null) { |
413 return -1; | 918 throw new StateError("peerCertificate called on destroyed SecureSocket"); |
414 } | 919 } |
415 return result; | 920 return _raw.peerCertificate; |
416 } | 921 } |
417 throw new | 922 } |
418 SocketIOException("Error: readList failed - invalid socket handle"); | |
419 } | |
420 | |
421 _readList(List<int> buffer, int offset, int bytes) native "Socket_ReadList"; | |
422 | |
423 int writeList(List<int> buffer, int offset, int bytes) { | |
424 if (buffer is! List || offset is! int || bytes is! int) { | |
425 throw new ArgumentError( | |
426 "Invalid arguments to writeList on Socket"); | |
427 } | |
428 if (!_closed) { | |
429 if (bytes == 0) { | |
430 return 0; | |
431 } | |
432 if (offset < 0) { | |
433 throw new RangeError.value(offset); | |
434 } | |
435 if (bytes < 0) { | |
436 throw new RangeError.value(bytes); | |
437 } | |
438 if ((offset + bytes) > buffer.length) { | |
439 throw new RangeError.value(offset + bytes); | |
440 } | |
441 _BufferAndOffset bufferAndOffset = | |
442 _ensureFastAndSerializableBuffer(buffer, offset, bytes); | |
443 var result = | |
444 _writeList(bufferAndOffset.buffer, bufferAndOffset.offset, bytes); | |
445 if (result is OSError) { | |
446 _reportError(result, "Write failed"); | |
447 // If writing fails we return 0 as the number of bytes and | |
448 // report the error on the error handler. | |
449 result = 0; | |
450 } | |
451 return result; | |
452 } | |
453 throw new SocketIOException("writeList failed - invalid socket handle"); | |
454 } | |
455 | |
456 _writeList(List<int> buffer, int offset, int bytes) native "Socket_WriteList"; | |
457 | |
458 bool _isErrorResponse(response) { | |
459 return response is List && response[0] != _SUCCESS_RESPONSE; | |
460 } | |
461 | |
462 bool _createConnect(String host, int port) native "Socket_CreateConnect"; | |
463 | |
464 void set onWrite(void callback()) { | |
465 if (_outputStream != null) throw new StreamException( | |
466 "Cannot set write handler when output stream is used"); | |
467 _clientWriteHandler = callback; | |
468 _updateOutHandler(); | |
469 } | |
470 | |
471 void set onConnect(void callback()) { | |
472 if (_seenFirstOutEvent) { | |
473 throw new StreamException( | |
474 "Cannot set connect handler when already connected"); | |
475 } | |
476 _clientConnectHandler = callback; | |
477 _updateOutHandler(); | |
478 } | |
479 | |
480 void set onData(void callback()) { | |
481 if (_inputStream != null) throw new StreamException( | |
482 "Cannot set data handler when input stream is used"); | |
483 _onData = callback; | |
484 } | |
485 | |
486 void set onClosed(void callback()) { | |
487 if (_inputStream != null) throw new StreamException( | |
488 "Cannot set close handler when input stream is used"); | |
489 _onClosed = callback; | |
490 } | |
491 | |
492 bool _isListenSocket() => false; | |
493 | |
494 bool _isPipe() => _pipe; | |
495 | |
496 InputStream get inputStream { | |
497 if (_inputStream == null) { | |
498 if (_handlerMap[_SocketBase._IN_EVENT] != null || | |
499 _handlerMap[_SocketBase._CLOSE_EVENT] != null) { | |
500 throw new StreamException( | |
501 "Cannot get input stream when socket handlers are used"); | |
502 } | |
503 _inputStream = new _SocketInputStream(this); | |
504 } | |
505 return _inputStream; | |
506 } | |
507 | |
508 OutputStream get outputStream { | |
509 if (_outputStream == null) { | |
510 if (_clientWriteHandler != null) { | |
511 throw new StreamException( | |
512 "Cannot get output stream when socket handlers are used"); | |
513 } | |
514 _outputStream = new _SocketOutputStream(this); | |
515 } | |
516 return _outputStream; | |
517 } | |
518 | |
519 void set _onWrite(void callback()) { | |
520 _setHandler(_SocketBase._OUT_EVENT, callback); | |
521 } | |
522 | |
523 void set _onData(void callback()) { | |
524 _setHandler(_SocketBase._IN_EVENT, callback); | |
525 } | |
526 | |
527 void set _onClosed(void callback()) { | |
528 _setHandler(_SocketBase._CLOSE_EVENT, callback); | |
529 } | |
530 | |
531 bool _propagateError(Exception e) { | |
532 bool reported = false; | |
533 if (_inputStream != null) { | |
534 reported = reported || _inputStream._onSocketError(e); | |
535 } | |
536 if (_outputStream != null) { | |
537 reported = reported || _outputStream._onSocketError(e); | |
538 } | |
539 return reported; | |
540 } | |
541 | |
542 void _updateOutHandler() { | |
543 void firstWriteHandler() { | |
544 assert(!_seenFirstOutEvent); | |
545 _seenFirstOutEvent = true; | |
546 | |
547 // From now on the write handler is only the client write | |
548 // handler (connect handler cannot be called again). Change this | |
549 // before calling any handlers as handlers can change the | |
550 // handlers. | |
551 if (_clientWriteHandler == null) _onWrite = _clientWriteHandler; | |
552 | |
553 // First out event is socket connected event. | |
554 if (_clientConnectHandler != null) _clientConnectHandler(); | |
555 _clientConnectHandler = null; | |
556 | |
557 // Always (even for the first out event) call the write handler. | |
558 if (_clientWriteHandler != null) _clientWriteHandler(); | |
559 } | |
560 | |
561 if (_clientConnectHandler == null && _clientWriteHandler == null) { | |
562 _onWrite = null; | |
563 } else { | |
564 if (_seenFirstOutEvent) { | |
565 _onWrite = _clientWriteHandler; | |
566 } else { | |
567 _onWrite = firstWriteHandler; | |
568 } | |
569 } | |
570 } | |
571 | |
572 int get remotePort { | |
573 if (_remotePort == null) { | |
574 remoteHost; | |
575 } | |
576 return _remotePort; | |
577 } | |
578 | |
579 String get remoteHost { | |
580 if (_remoteHost == null) { | |
581 List peer = _getRemotePeer(); | |
582 _remoteHost = peer[0]; | |
583 _remotePort = peer[1]; | |
584 } | |
585 return _remoteHost; | |
586 } | |
587 | |
588 List _getRemotePeer() native "Socket_GetRemotePeer"; | |
589 | |
590 static SendPort _newServicePort() native "Socket_NewServicePort"; | |
591 | |
592 static void _ensureSocketService() { | |
593 if (_socketService == null) { | |
594 _socketService = _Socket._newServicePort(); | |
595 } | |
596 } | |
597 | |
598 bool _seenFirstOutEvent = false; | |
599 bool _pipe = false; | |
600 Function _clientConnectHandler; | |
601 Function _clientWriteHandler; | |
602 _SocketInputStream _inputStream; | |
603 _SocketOutputStream _outputStream; | |
604 String _remoteHost; | |
605 int _remotePort; | |
606 static SendPort _socketService; | |
607 } | |
OLD | NEW |