Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: runtime/bin/socket_patch.dart

Issue 16131003: Reapply "Active stream subscriptions". (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Patch from sgjesse fixing file descriptor error. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/scheduled_test/lib/src/utils.dart ('k') | sdk/lib/_internal/pub/lib/src/error_group.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 patch class RawServerSocket { 5 patch class RawServerSocket {
6 /* patch */ static Future<RawServerSocket> bind(address, 6 /* patch */ static Future<RawServerSocket> bind(address,
7 int port, 7 int port,
8 {int backlog: 0, 8 {int backlog: 0,
9 bool v6Only: false}) { 9 bool v6Only: false}) {
10 return _RawServerSocket.bind(address, port, backlog, v6Only); 10 return _RawServerSocket.bind(address, port, backlog, v6Only);
(...skipping 682 matching lines...) Expand 10 before | Expand all | Expand 10 after
693 int get remotePort => _socket.remotePort; 693 int get remotePort => _socket.remotePort;
694 694
695 InternetAddress get address => _socket.address; 695 InternetAddress get address => _socket.address;
696 696
697 String get remoteHost => _socket.remoteHost; 697 String get remoteHost => _socket.remoteHost;
698 698
699 bool get readEventsEnabled => _readEventsEnabled; 699 bool get readEventsEnabled => _readEventsEnabled;
700 void set readEventsEnabled(bool value) { 700 void set readEventsEnabled(bool value) {
701 if (value != _readEventsEnabled) { 701 if (value != _readEventsEnabled) {
702 _readEventsEnabled = value; 702 _readEventsEnabled = value;
703 if (!_controller.isPaused) _resume(); 703 if (_controller.hasListener && !_controller.isPaused) _resume();
704 } 704 }
705 } 705 }
706 706
707 bool get writeEventsEnabled => _writeEventsEnabled; 707 bool get writeEventsEnabled => _writeEventsEnabled;
708 void set writeEventsEnabled(bool value) { 708 void set writeEventsEnabled(bool value) {
709 if (value != _writeEventsEnabled) { 709 if (value != _writeEventsEnabled) {
710 _writeEventsEnabled = value; 710 _writeEventsEnabled = value;
711 if (!_controller.isPaused) _resume(); 711 if (_controller.hasListener && !_controller.isPaused) _resume();
712 } 712 }
713 } 713 }
714 714
715 bool setOption(SocketOption option, bool enabled) => 715 bool setOption(SocketOption option, bool enabled) =>
716 _socket.setOption(option, enabled); 716 _socket.setOption(option, enabled);
717 717
718 _pause() { 718 _pause() {
719 _socket.setListening(read: false, write: false); 719 _socket.setListening(read: false, write: false);
720 } 720 }
721 721
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
830 830
831 void write() { 831 void write() {
832 try { 832 try {
833 if (subscription == null) return; 833 if (subscription == null) return;
834 assert(buffer != null); 834 assert(buffer != null);
835 // Write as much as possible. 835 // Write as much as possible.
836 offset += socket._write(buffer, offset, buffer.length - offset); 836 offset += socket._write(buffer, offset, buffer.length - offset);
837 if (offset < buffer.length) { 837 if (offset < buffer.length) {
838 if (!paused) { 838 if (!paused) {
839 paused = true; 839 paused = true;
840 // TODO(ajohnsen): It would be nice to avoid this check.
841 // Some info: socket._write can emit an event, if it fails to write.
842 // If the user closes the socket in that event, stop() will be called
843 // before we get a change to pause.
844 if (subscription == null) return;
845 subscription.pause(); 840 subscription.pause();
846 } 841 }
847 socket._enableWriteEvent(); 842 socket._enableWriteEvent();
848 } else { 843 } else {
849 buffer = null; 844 buffer = null;
850 if (paused) { 845 if (paused) {
851 paused = false; 846 paused = false;
852 subscription.resume(); 847 subscription.resume();
853 } 848 }
854 } 849 }
(...skipping 12 matching lines...) Expand all
867 streamCompleter.complete(socket); 862 streamCompleter.complete(socket);
868 } 863 }
869 streamCompleter = null; 864 streamCompleter = null;
870 } 865 }
871 } 866 }
872 867
873 void stop() { 868 void stop() {
874 if (subscription == null) return; 869 if (subscription == null) return;
875 subscription.cancel(); 870 subscription.cancel();
876 subscription = null; 871 subscription = null;
872 paused = false;
877 socket._disableWriteEvent(); 873 socket._disableWriteEvent();
878 } 874 }
879 } 875 }
880 876
881 877
882 class _Socket extends Stream<List<int>> implements Socket { 878 class _Socket extends Stream<List<int>> implements Socket {
883 RawSocket _raw; // Set to null when the raw socket is closed. 879 RawSocket _raw; // Set to null when the raw socket is closed.
884 bool _closed = false; // Set to true when the raw socket is closed. 880 bool _closed = false; // Set to true when the raw socket is closed.
885 StreamController _controller; 881 StreamController _controller;
886 bool _controllerClosed = false; 882 bool _controllerClosed = false;
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after
1069 if (_detachReady != null) { 1065 if (_detachReady != null) {
1070 _detachReady.complete(null); 1066 _detachReady.complete(null);
1071 } else { 1067 } else {
1072 if (_raw != null) { 1068 if (_raw != null) {
1073 _raw.shutdown(SocketDirection.SEND); 1069 _raw.shutdown(SocketDirection.SEND);
1074 _disableWriteEvent(); 1070 _disableWriteEvent();
1075 } 1071 }
1076 } 1072 }
1077 } 1073 }
1078 } 1074 }
OLDNEW
« no previous file with comments | « pkg/scheduled_test/lib/src/utils.dart ('k') | sdk/lib/_internal/pub/lib/src/error_group.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698