Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(45)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 14864009: Keep track of when a socket has been destroyed (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
104 // eventhandler. When receiving a message from the eventhandler the 104 // eventhandler. When receiving a message from the eventhandler the
105 // EVENT flags indicate the events that actually happened. The 105 // EVENT flags indicate the events that actually happened. The
106 // COMMAND flags are used to send commands from dart to the 106 // COMMAND flags are used to send commands from dart to the
107 // eventhandler. COMMAND flags are never received from the 107 // eventhandler. COMMAND flags are never received from the
108 // eventhandler. Additional flags are used to communicate other 108 // eventhandler. Additional flags are used to communicate other
109 // information. 109 // information.
110 static const int READ_EVENT = 0; 110 static const int READ_EVENT = 0;
111 static const int WRITE_EVENT = 1; 111 static const int WRITE_EVENT = 1;
112 static const int ERROR_EVENT = 2; 112 static const int ERROR_EVENT = 2;
113 static const int CLOSED_EVENT = 3; 113 static const int CLOSED_EVENT = 3;
114 static const int DESTROYED_EVENT = 4;
114 static const int FIRST_EVENT = READ_EVENT; 115 static const int FIRST_EVENT = READ_EVENT;
115 static const int LAST_EVENT = CLOSED_EVENT; 116 static const int LAST_EVENT = DESTROYED_EVENT;
116 static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1; 117 static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1;
117 118
118 static const int CLOSE_COMMAND = 8; 119 static const int CLOSE_COMMAND = 8;
119 static const int SHUTDOWN_READ_COMMAND = 9; 120 static const int SHUTDOWN_READ_COMMAND = 9;
120 static const int SHUTDOWN_WRITE_COMMAND = 10; 121 static const int SHUTDOWN_WRITE_COMMAND = 10;
121 static const int FIRST_COMMAND = CLOSE_COMMAND; 122 static const int FIRST_COMMAND = CLOSE_COMMAND;
122 static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND; 123 static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND;
123 124
124 // Type flag send to the eventhandler providing additional 125 // Type flag send to the eventhandler providing additional
125 // information on the type of the file descriptor. 126 // information on the type of the file descriptor.
126 static const int LISTENING_SOCKET = 16; 127 static const int LISTENING_SOCKET = 16;
127 static const int PIPE_SOCKET = 17; 128 static const int PIPE_SOCKET = 17;
128 static const int TYPE_NORMAL_SOCKET = 0; 129 static const int TYPE_NORMAL_SOCKET = 0;
129 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET; 130 static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET;
130 static const int TYPE_PIPE = 1 << PIPE_SOCKET; 131 static const int TYPE_PIPE = 1 << PIPE_SOCKET;
131 132
132 // Native port messages. 133 // Native port messages.
133 static const HOST_NAME_LOOKUP = 0; 134 static const HOST_NAME_LOOKUP = 0;
134 135
135 // Socket close state 136 // Socket close state
136 bool isClosed = false; 137 bool isClosed = false;
138 bool isClosing = false;
137 bool isClosedRead = false; 139 bool isClosedRead = false;
138 bool isClosedWrite = false; 140 bool isClosedWrite = false;
139 Completer closeCompleter = new Completer(); 141 Completer closeCompleter = new Completer();
140 142
141 // Handlers and receive port for socket events from the event handler. 143 // Handlers and receive port for socket events from the event handler.
142 int eventMask = 0; 144 int eventMask = 0;
143 List eventHandlers; 145 List eventHandlers;
144 ReceivePort eventPort; 146 ReceivePort eventPort;
145 147
146 // Indicates if native interrupts can be activated. 148 // Indicates if native interrupts can be activated.
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 eventHandlers = new List(EVENT_COUNT + 1); 256 eventHandlers = new List(EVENT_COUNT + 1);
255 _EventHandler._start(); 257 _EventHandler._start();
256 } 258 }
257 259
258 _NativeSocket.pipe() : typeFlags = TYPE_PIPE { 260 _NativeSocket.pipe() : typeFlags = TYPE_PIPE {
259 eventHandlers = new List(EVENT_COUNT + 1); 261 eventHandlers = new List(EVENT_COUNT + 1);
260 _EventHandler._start(); 262 _EventHandler._start();
261 } 263 }
262 264
263 int available() { 265 int available() {
264 if (isClosed) return 0; 266 if (isClosing || isClosed) return 0;
265 var result = nativeAvailable(); 267 var result = nativeAvailable();
266 if (result is OSError) { 268 if (result is OSError) {
267 reportError(result, "Available failed"); 269 reportError(result, "Available failed");
268 return 0; 270 return 0;
269 } else { 271 } else {
270 return result; 272 return result;
271 } 273 }
272 } 274 }
273 275
274 List<int> read(int len) { 276 List<int> read(int len) {
275 if (len != null && len <= 0) { 277 if (len != null && len <= 0) {
276 throw new ArgumentError("Illegal length $len"); 278 throw new ArgumentError("Illegal length $len");
277 } 279 }
278 if (isClosed) return null; 280 if (isClosing || isClosed) return null;
279 var result = nativeRead(len == null ? -1 : len); 281 var result = nativeRead(len == null ? -1 : len);
280 if (result is OSError) { 282 if (result is OSError) {
281 reportError(result, "Read failed"); 283 reportError(result, "Read failed");
282 return null; 284 return null;
283 } 285 }
284 return result; 286 return result;
285 } 287 }
286 288
287 int write(List<int> buffer, int offset, int bytes) { 289 int write(List<int> buffer, int offset, int bytes) {
288 if (buffer is! List) throw new ArgumentError(); 290 if (buffer is! List) throw new ArgumentError();
289 if (offset == null) offset = 0; 291 if (offset == null) offset = 0;
290 if (bytes == null) bytes = buffer.length; 292 if (bytes == null) bytes = buffer.length;
291 if (offset < 0) throw new RangeError.value(offset); 293 if (offset < 0) throw new RangeError.value(offset);
292 if (bytes < 0) throw new RangeError.value(bytes); 294 if (bytes < 0) throw new RangeError.value(bytes);
293 if ((offset + bytes) > buffer.length) { 295 if ((offset + bytes) > buffer.length) {
294 throw new RangeError.value(offset + bytes); 296 throw new RangeError.value(offset + bytes);
295 } 297 }
296 if (offset is! int || bytes is! int) { 298 if (offset is! int || bytes is! int) {
297 throw new ArgumentError("Invalid arguments to write on Socket"); 299 throw new ArgumentError("Invalid arguments to write on Socket");
298 } 300 }
299 if (isClosed) return 0; 301 if (isClosing || isClosed) return 0;
300 if (bytes == 0) return 0; 302 if (bytes == 0) return 0;
301 _BufferAndStart bufferAndStart = 303 _BufferAndStart bufferAndStart =
302 _ensureFastAndSerializableBuffer(buffer, offset, offset + bytes); 304 _ensureFastAndSerializableBuffer(buffer, offset, offset + bytes);
303 var result = 305 var result =
304 nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes); 306 nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes);
305 if (result is OSError) { 307 if (result is OSError) {
306 reportError(result, "Write failed"); 308 reportError(result, "Write failed");
307 result = 0; 309 result = 0;
308 } 310 }
309 return result; 311 return result;
(...skipping 20 matching lines...) Expand all
330 return nativeGetRemotePeer()[0]; 332 return nativeGetRemotePeer()[0];
331 } 333 }
332 334
333 // Multiplexes socket events to the socket handlers. 335 // Multiplexes socket events to the socket handlers.
334 void multiplex(int events) { 336 void multiplex(int events) {
335 canActivateEvents = false; 337 canActivateEvents = false;
336 for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) { 338 for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
337 if (((events & (1 << i)) != 0)) { 339 if (((events & (1 << i)) != 0)) {
338 if (i == CLOSED_EVENT && 340 if (i == CLOSED_EVENT &&
339 typeFlags != TYPE_LISTENING_SOCKET && 341 typeFlags != TYPE_LISTENING_SOCKET &&
342 !isClosing &&
340 !isClosed) { 343 !isClosed) {
341 isClosedRead = true; 344 isClosedRead = true;
342 } 345 }
343 346
344 var handler = eventHandlers[i]; 347 var handler = eventHandlers[i];
348 if (i == DESTROYED_EVENT) {
349 assert(!isClosed);
350 isClosed = true;
351 closeCompleter.complete(this);
352 disconnectFromEventHandler();
353 if (handler != null) handler();
354 continue;
355 }
345 assert(handler != null); 356 assert(handler != null);
346 if (i == WRITE_EVENT) { 357 if (i == WRITE_EVENT) {
347 // If the event was disabled before we had a chance to fire the event, 358 // If the event was disabled before we had a chance to fire the event,
348 // discard it. If we register again, we'll get a new one. 359 // discard it. If we register again, we'll get a new one.
349 if ((eventMask & (1 << i)) == 0) continue; 360 if ((eventMask & (1 << i)) == 0) continue;
350 // Unregister the out handler before executing it. There is 361 // Unregister the out handler before executing it. There is
351 // no need to notify the eventhandler as handlers are 362 // no need to notify the eventhandler as handlers are
352 // disabled while the event is handled. 363 // disabled while the event is handled.
353 eventMask &= ~(1 << i); 364 eventMask &= ~(1 << i);
354 } 365 }
(...skipping 10 matching lines...) Expand all
365 } else if (!isClosed) { 376 } else if (!isClosed) {
366 handler(); 377 handler();
367 } 378 }
368 } 379 }
369 } 380 }
370 if (isClosedRead && isClosedWrite) close(); 381 if (isClosedRead && isClosedWrite) close();
371 canActivateEvents = true; 382 canActivateEvents = true;
372 activateHandlers(); 383 activateHandlers();
373 } 384 }
374 385
375 void setHandlers({read: null, write: null, error: null, closed: null}) { 386 void setHandlers({read, write, error, closed, destroyed}) {
376 eventHandlers[READ_EVENT] = read; 387 eventHandlers[READ_EVENT] = read;
377 eventHandlers[WRITE_EVENT] = write; 388 eventHandlers[WRITE_EVENT] = write;
378 eventHandlers[ERROR_EVENT] = error; 389 eventHandlers[ERROR_EVENT] = error;
379 eventHandlers[CLOSED_EVENT] = closed; 390 eventHandlers[CLOSED_EVENT] = closed;
391 eventHandlers[DESTROYED_EVENT] = destroyed;
380 } 392 }
381 393
382 void setListening({read: true, write: true}) { 394 void setListening({read: true, write: true}) {
383 eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT); 395 eventMask = (1 << CLOSED_EVENT) | (1 << ERROR_EVENT);
384 if (read) eventMask |= (1 << READ_EVENT); 396 if (read) eventMask |= (1 << READ_EVENT);
385 if (write) eventMask |= (1 << WRITE_EVENT); 397 if (write) eventMask |= (1 << WRITE_EVENT);
386 activateHandlers(); 398 activateHandlers();
387 } 399 }
388 400
389 Future get closeFuture => closeCompleter.future; 401 Future<_NativeSocket> get closeFuture => closeCompleter.future;
390 402
391 void activateHandlers() { 403 void activateHandlers() {
392 if (canActivateEvents && !isClosed) { 404 if (canActivateEvents && !isClosed) {
393 // If we don't listen for either read or write, disconnect as we won't 405 if (!isClosing &&
394 // get close and error events anyway. 406 (eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) {
395 if ((eventMask & ((1 << READ_EVENT) | (1 << WRITE_EVENT))) == 0) { 407 // If we don't listen for either read or write, disconnect as we won't
408 // get close and error events anyway.
396 if (eventPort != null) disconnectFromEventHandler(); 409 if (eventPort != null) disconnectFromEventHandler();
397 } else { 410 } else {
398 int data = eventMask; 411 int data = eventMask;
412 if (isClosing) {
413 data = 1 << DESTROYED_EVENT;
414 } else {
415 if (isClosedRead) data &= ~(1 << READ_EVENT);
416 if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
417 }
399 data |= typeFlags; 418 data |= typeFlags;
400 if (isClosedRead) data &= ~(1 << READ_EVENT);
401 if (isClosedWrite) data &= ~(1 << WRITE_EVENT);
402 sendToEventHandler(data); 419 sendToEventHandler(data);
403 } 420 }
404 } 421 }
405 } 422 }
406 423
407 void close() { 424 Future<_NativeSocket> close() {
408 if (!isClosed) { 425 if (!isClosing && !isClosed) {
409 sendToEventHandler(1 << CLOSE_COMMAND); 426 sendToEventHandler(1 << CLOSE_COMMAND);
410 isClosed = true; 427 isClosing = true;
411 closeCompleter.complete(this);
412 } 428 }
413 // Outside the if support closing sockets created but never 429 return closeFuture;
414 // assigned any actual socket.
415 disconnectFromEventHandler();
416 } 430 }
417 431
418 void shutdown(SocketDirection direction) { 432 void shutdown(SocketDirection direction) {
419 if (!isClosed) { 433 if (!isClosing && !isClosed) {
420 switch (direction) { 434 switch (direction) {
421 case SocketDirection.RECEIVE: 435 case SocketDirection.RECEIVE:
422 shutdownRead(); 436 shutdownRead();
423 break; 437 break;
424 case SocketDirection.SEND: 438 case SocketDirection.SEND:
425 shutdownWrite(); 439 shutdownWrite();
426 break; 440 break;
427 case SocketDirection.BOTH: 441 case SocketDirection.BOTH:
428 close(); 442 close();
429 break; 443 break;
430 default: 444 default:
431 throw new ArgumentError(direction); 445 throw new ArgumentError(direction);
432 } 446 }
433 } 447 }
434 } 448 }
435 449
436 void shutdownWrite() { 450 void shutdownWrite() {
437 if (!isClosed) { 451 if (!isClosing && !isClosed) {
438 if (isClosedRead) { 452 if (isClosedRead) {
439 close(); 453 close();
440 } else { 454 } else {
441 sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND); 455 sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
442 } 456 }
443 isClosedWrite = true; 457 isClosedWrite = true;
444 } 458 }
445 } 459 }
446 460
447 void shutdownRead() { 461 void shutdownRead() {
448 if (!isClosed) { 462 if (!isClosing && !isClosed) {
449 if (isClosedWrite) { 463 if (isClosedWrite) {
450 close(); 464 close();
451 } else { 465 } else {
452 sendToEventHandler(1 << SHUTDOWN_READ_COMMAND); 466 sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
453 } 467 }
454 isClosedRead = true; 468 isClosedRead = true;
455 } 469 }
456 } 470 }
457 471
458 void sendToEventHandler(int data) { 472 void sendToEventHandler(int data) {
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
582 bool cancelOnError}) { 596 bool cancelOnError}) {
583 return _controller.stream.listen( 597 return _controller.stream.listen(
584 onData, 598 onData,
585 onError: onError, 599 onError: onError,
586 onDone: onDone, 600 onDone: onDone,
587 cancelOnError: cancelOnError); 601 cancelOnError: cancelOnError);
588 } 602 }
589 603
590 int get port => _socket.port; 604 int get port => _socket.port;
591 605
592 void close() => _socket.close(); 606 Future close() => _socket.close().then((_) => this);
593 607
594 void _pause() { 608 void _pause() {
595 _socket.setListening(read: false, write: false); 609 _socket.setListening(read: false, write: false);
596 } 610 }
597 611
598 void _resume() { 612 void _resume() {
599 _socket.setListening(read: true, write: false); 613 _socket.setListening(read: true, write: false);
600 } 614 }
601 615
602 void _onSubscriptionStateChange() { 616 void _onSubscriptionStateChange() {
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
638 _socket.closeFuture.then((_) => _controller.close()); 652 _socket.closeFuture.then((_) => _controller.close());
639 _socket.setHandlers( 653 _socket.setHandlers(
640 read: () => _controller.add(RawSocketEvent.READ), 654 read: () => _controller.add(RawSocketEvent.READ),
641 write: () { 655 write: () {
642 // The write event handler is automatically disabled by the 656 // The write event handler is automatically disabled by the
643 // event handler when it fires. 657 // event handler when it fires.
644 _writeEventsEnabled = false; 658 _writeEventsEnabled = false;
645 _controller.add(RawSocketEvent.WRITE); 659 _controller.add(RawSocketEvent.WRITE);
646 }, 660 },
647 closed: () => _controller.add(RawSocketEvent.READ_CLOSED), 661 closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
662 destroyed: () => _controller.add(RawSocketEvent.CLOSED),
648 error: (e) { 663 error: (e) {
649 _controller.addError(e); 664 _controller.addError(e);
650 close(); 665 close();
651 } 666 }
652 ); 667 );
653 } 668 }
654 669
655 factory _RawSocket._writePipe(int fd) { 670 factory _RawSocket._writePipe(int fd) {
656 var native = new _NativeSocket.pipe(); 671 var native = new _NativeSocket.pipe();
657 native.isClosedRead = true; 672 native.isClosedRead = true;
(...skipping 19 matching lines...) Expand all
677 cancelOnError: cancelOnError); 692 cancelOnError: cancelOnError);
678 } 693 }
679 694
680 int available() => _socket.available(); 695 int available() => _socket.available();
681 696
682 List<int> read([int len]) => _socket.read(len); 697 List<int> read([int len]) => _socket.read(len);
683 698
684 int write(List<int> buffer, [int offset, int count]) => 699 int write(List<int> buffer, [int offset, int count]) =>
685 _socket.write(buffer, offset, count); 700 _socket.write(buffer, offset, count);
686 701
687 void close() => _socket.close(); 702 Future close() => _socket.close().then((_) => this);
688 703
689 void shutdown(SocketDirection direction) => _socket.shutdown(direction); 704 void shutdown(SocketDirection direction) => _socket.shutdown(direction);
690 705
691 int get port => _socket.port; 706 int get port => _socket.port;
692 707
693 int get remotePort => _socket.remotePort; 708 int get remotePort => _socket.remotePort;
694 709
695 InternetAddress get address => _socket.address; 710 InternetAddress get address => _socket.address;
696 711
697 String get remoteHost => _socket.remoteHost; 712 String get remoteHost => _socket.remoteHost;
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
770 bool cancelOnError}) { 785 bool cancelOnError}) {
771 return _socket.map((rawSocket) => new _Socket(rawSocket)).listen( 786 return _socket.map((rawSocket) => new _Socket(rawSocket)).listen(
772 onData, 787 onData,
773 onError: onError, 788 onError: onError,
774 onDone: onDone, 789 onDone: onDone,
775 cancelOnError: cancelOnError); 790 cancelOnError: cancelOnError);
776 } 791 }
777 792
778 int get port => _socket.port; 793 int get port => _socket.port;
779 794
780 void close() => _socket.close(); 795 Future close() => _socket.close().then((_) => this);
781 } 796 }
782 797
783 798
784 patch class Socket { 799 patch class Socket {
785 /* patch */ static Future<Socket> connect(host, int port) { 800 /* patch */ static Future<Socket> connect(host, int port) {
786 return RawSocket.connect(host, port).then( 801 return RawSocket.connect(host, port).then(
787 (socket) => new _Socket(socket)); 802 (socket) => new _Socket(socket));
788 } 803 }
789 } 804 }
790 805
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after
950 void add(List<int> bytes) => _sink.add(bytes); 965 void add(List<int> bytes) => _sink.add(bytes);
951 966
952 Future<Socket> addStream(Stream<List<int>> stream) { 967 Future<Socket> addStream(Stream<List<int>> stream) {
953 return _sink.addStream(stream); 968 return _sink.addStream(stream);
954 } 969 }
955 970
956 Future<Socket> close() => _sink.close(); 971 Future<Socket> close() => _sink.close();
957 972
958 Future<Socket> get done => _sink.done; 973 Future<Socket> get done => _sink.done;
959 974
960 void destroy() { 975 void destroy() {
Anders Johnsen 2013/05/08 13:11:47 return Future<Socket>.
961 // Destroy can always be called to get rid of a socket. 976 // Destroy can always be called to get rid of a socket.
962 if (_raw == null) return; 977 if (_raw == null) return;
963 _consumer.stop(); 978 _consumer.stop();
964 _closeRawSocket(); 979 _closeRawSocket();
965 _controllerClosed = true; 980 _controllerClosed = true;
966 _controller.close(); 981 _controller.close();
967 } 982 }
968 983
969 bool setOption(SocketOption option, bool enabled) { 984 bool setOption(SocketOption option, bool enabled) {
970 if (_raw == null) return false; 985 if (_raw == null) return false;
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
1095 _raw.onBadCertificate = callback; 1110 _raw.onBadCertificate = callback;
1096 } 1111 }
1097 1112
1098 X509Certificate get peerCertificate { 1113 X509Certificate get peerCertificate {
1099 if (_raw == null) { 1114 if (_raw == null) {
1100 throw new StateError("peerCertificate called on destroyed SecureSocket"); 1115 throw new StateError("peerCertificate called on destroyed SecureSocket");
1101 } 1116 }
1102 return _raw.peerCertificate; 1117 return _raw.peerCertificate;
1103 } 1118 }
1104 } 1119 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698