Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Side by Side Diff: net/tools/flip_server/flip_in_mem_edsm_server.cc

Issue 6332010: Memory leak fix, log file reopen, remote ip fix, connection idle timeout,... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/tools/flip_server/flip_config.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <dirent.h> 5 #include <dirent.h>
6 #include <netinet/tcp.h> // For TCP_NODELAY 6 #include <netinet/tcp.h> // For TCP_NODELAY
7 #include <sys/socket.h> 7 #include <sys/socket.h>
8 #include <sys/types.h> 8 #include <sys/types.h>
9 #include <sys/file.h>
10 #include <sys/stat.h>
9 #include <unistd.h> 11 #include <unistd.h>
10 #include <openssl/err.h> 12 #include <openssl/err.h>
11 #include <openssl/ssl.h> 13 #include <openssl/ssl.h>
12 #include <signal.h> 14 #include <signal.h>
13 15
14 #include <deque> 16 #include <deque>
15 #include <iostream> 17 #include <iostream>
16 #include <limits> 18 #include <limits>
17 #include <vector> 19 #include <vector>
18 #include <list> 20 #include <list>
(...skipping 22 matching lines...) Expand all
41 //////////////////////////////////////////////////////////////////////////////// 43 ////////////////////////////////////////////////////////////////////////////////
42 44
43 using std::deque; 45 using std::deque;
44 using std::list; 46 using std::list;
45 using std::map; 47 using std::map;
46 using std::ostream; 48 using std::ostream;
47 using std::pair; 49 using std::pair;
48 using std::string; 50 using std::string;
49 using std::vector; 51 using std::vector;
50 using std::cout; 52 using std::cout;
53 using std::cerr;
51 54
52 //////////////////////////////////////////////////////////////////////////////// 55 ////////////////////////////////////////////////////////////////////////////////
53 56
54 #define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \ 57 #define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \
55 (((IP)>>16)&0xff),(((IP)>>24)&0xff) 58 (((IP)>>16)&0xff),(((IP)>>24)&0xff)
56 59
57 #define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \ 60 #define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \
58 << acceptor_->listen_port_ << " " 61 << acceptor_->listen_port_ << " "
59 62
60 #define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" 63 #define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0"
61 64
62 #define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM :HIGH" 65 #define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM :HIGH"
63 66
67 #define PIDFILE "/var/run/flip-server.pid"
68
64 // If true, then disables the nagle algorithm); 69 // If true, then disables the nagle algorithm);
65 bool FLAGS_disable_nagle = true; 70 bool FLAGS_disable_nagle = true;
66 71
67 // The number of times that accept() will be called when the 72 // The number of times that accept() will be called when the
68 // alarm goes off when the accept_using_alarm flag is set to true. 73 // alarm goes off when the accept_using_alarm flag is set to true.
69 // If set to 0, accept() will be performed until the accept queue 74 // If set to 0, accept() will be performed until the accept queue
70 // is completely drained and the accept() call returns an error); 75 // is completely drained and the accept() call returns an error);
71 int32 FLAGS_accepts_per_wake = 0; 76 int32 FLAGS_accepts_per_wake = 0;
72 77
73 // The size of the TCP accept backlog); 78 // The size of the TCP accept backlog);
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after
282 287
283 //////////////////////////////////////////////////////////////////////////////// 288 ////////////////////////////////////////////////////////////////////////////////
284 289
285 class DataFrame { 290 class DataFrame {
286 public: 291 public:
287 const char* data; 292 const char* data;
288 size_t size; 293 size_t size;
289 bool delete_when_done; 294 bool delete_when_done;
290 size_t index; 295 size_t index;
291 DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {} 296 DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {}
292 void MaybeDelete() { 297 virtual void MaybeDelete() {
293 if (delete_when_done) { 298 if (delete_when_done) {
294 delete[] data; 299 delete[] data;
295 } 300 }
296 } 301 }
302 virtual ~DataFrame() {
303 MaybeDelete();
304 }
305 };
306
307 class SpdyFrameDataFrame : public DataFrame {
308 public:
309 SpdyFrame* frame;
310 virtual void MaybeDelete() {
311 if (delete_when_done) {
312 delete frame;
313 }
314 }
297 }; 315 };
298 316
299 //////////////////////////////////////////////////////////////////////////////// 317 ////////////////////////////////////////////////////////////////////////////////
300 318
301 class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface { 319 class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface {
302 public: 320 public:
303 BalsaHeaders headers; 321 BalsaHeaders headers;
304 string body; 322 string body;
305 bool error_; 323 bool error_;
306 324
(...skipping 384 matching lines...) Expand 10 before | Expand all | Expand 10 after
691 class SMInterface { 709 class SMInterface {
692 public: 710 public:
693 virtual void InitSMInterface(SMInterface* sm_other_interface, 711 virtual void InitSMInterface(SMInterface* sm_other_interface,
694 int32 server_idx) = 0; 712 int32 server_idx) = 0;
695 virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool, 713 virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool,
696 SMInterface* sm_interface, 714 SMInterface* sm_interface,
697 EpollServer* epoll_server, 715 EpollServer* epoll_server,
698 int fd, 716 int fd,
699 string server_ip, 717 string server_ip,
700 string server_port, 718 string server_port,
719 string remote_ip,
701 bool use_ssl) = 0; 720 bool use_ssl) = 0;
702 virtual size_t ProcessReadInput(const char* data, size_t len) = 0; 721 virtual size_t ProcessReadInput(const char* data, size_t len) = 0;
703 virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; 722 virtual size_t ProcessWriteInput(const char* data, size_t len) = 0;
704 virtual void SetStreamID(uint32 stream_id) = 0; 723 virtual void SetStreamID(uint32 stream_id) = 0;
705 virtual bool MessageFullyRead() const = 0; 724 virtual bool MessageFullyRead() const = 0;
706 virtual bool Error() const = 0; 725 virtual bool Error() const = 0;
707 virtual const char* ErrorAsString() const = 0; 726 virtual const char* ErrorAsString() const = 0;
708 virtual void Reset() = 0; 727 virtual void Reset() = 0;
709 virtual void ResetForNewInterface(int32 server_idx) = 0; 728 virtual void ResetForNewInterface(int32 server_idx) = 0;
710 // ResetForNewConnection is used for interfaces which control SMConnection 729 // ResetForNewConnection is used for interfaces which control SMConnection
(...skipping 23 matching lines...) Expand all
734 class SMConnectionInterface { 753 class SMConnectionInterface {
735 public: 754 public:
736 virtual ~SMConnectionInterface() {} 755 virtual ~SMConnectionInterface() {}
737 virtual void ReadyToSend() = 0; 756 virtual void ReadyToSend() = 0;
738 virtual EpollServer* epoll_server() = 0; 757 virtual EpollServer* epoll_server() = 0;
739 }; 758 };
740 759
741 class HttpSM; 760 class HttpSM;
742 class SMConnection; 761 class SMConnection;
743 762
744 typedef list<DataFrame> OutputList; 763 typedef list<DataFrame*> OutputList;
745 764
746 class SMConnectionPoolInterface { 765 class SMConnectionPoolInterface {
747 public: 766 public:
748 virtual ~SMConnectionPoolInterface() {} 767 virtual ~SMConnectionPoolInterface() {}
749 // SMConnections will use this: 768 // SMConnections will use this:
750 virtual void SMConnectionDone(SMConnection* connection) = 0; 769 virtual void SMConnectionDone(SMConnection* connection) = 0;
751 }; 770 };
752 771
753 SMInterface* NewStreamerSM(SMConnection* connection, 772 SMInterface* NewStreamerSM(SMConnection* connection,
754 SMInterface* sm_interface, 773 SMInterface* sm_interface,
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
789 ssl_state_(ssl_state), 808 ssl_state_(ssl_state),
790 memory_cache_(memory_cache), 809 memory_cache_(memory_cache),
791 acceptor_(acceptor), 810 acceptor_(acceptor),
792 read_buffer_(4096*10), 811 read_buffer_(4096*10),
793 sm_spdy_interface_(NULL), 812 sm_spdy_interface_(NULL),
794 sm_http_interface_(NULL), 813 sm_http_interface_(NULL),
795 sm_streamer_interface_(NULL), 814 sm_streamer_interface_(NULL),
796 sm_interface_(NULL), 815 sm_interface_(NULL),
797 log_prefix_(log_prefix), 816 log_prefix_(log_prefix),
798 max_bytes_sent_per_dowrite_(4096), 817 max_bytes_sent_per_dowrite_(4096),
799 ssl_(NULL) 818 ssl_(NULL),
819 last_read_time_(0)
800 {} 820 {}
801 821
802 int fd_; 822 int fd_;
803 int events_; 823 int events_;
804 824
805 bool registered_in_epoll_server_; 825 bool registered_in_epoll_server_;
806 bool initialized_; 826 bool initialized_;
807 bool protocol_detected_; 827 bool protocol_detected_;
808 bool connection_complete_; 828 bool connection_complete_;
809 829
(...skipping 11 matching lines...) Expand all
821 SMInterface* sm_spdy_interface_; 841 SMInterface* sm_spdy_interface_;
822 SMInterface* sm_http_interface_; 842 SMInterface* sm_http_interface_;
823 SMInterface* sm_streamer_interface_; 843 SMInterface* sm_streamer_interface_;
824 SMInterface* sm_interface_; 844 SMInterface* sm_interface_;
825 string log_prefix_; 845 string log_prefix_;
826 846
827 size_t max_bytes_sent_per_dowrite_; 847 size_t max_bytes_sent_per_dowrite_;
828 848
829 SSL* ssl_; 849 SSL* ssl_;
830 public: 850 public:
851 time_t last_read_time_;
831 string server_ip_; 852 string server_ip_;
832 string server_port_; 853 string server_port_;
833 854
834 EpollServer* epoll_server() { return epoll_server_; } 855 EpollServer* epoll_server() { return epoll_server_; }
835 OutputList* output_list() { return &output_list_; } 856 OutputList* output_list() { return &output_list_; }
836 MemoryCache* memory_cache() { return memory_cache_; } 857 MemoryCache* memory_cache() { return memory_cache_; }
837 void ReadyToSend() { 858 void ReadyToSend() {
838 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 859 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
839 << "Setting ready to send: EPOLLIN | EPOLLOUT"; 860 << "Setting ready to send: EPOLLIN | EPOLLOUT";
840 epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); 861 epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
841 } 862 }
842 void EnqueueDataFrame(const DataFrame& df) { 863 void EnqueueDataFrame(DataFrame* df) {
843 output_list_.push_back(df); 864 output_list_.push_back(df);
844 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " 865 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: "
845 << "size = " << df.size << ": Setting FD ready."; 866 << "size = " << df->size << ": Setting FD ready.";
846 ReadyToSend(); 867 ReadyToSend();
847 } 868 }
848 int fd() { return fd_; } 869 int fd() { return fd_; }
849 870
850 public: 871 public:
851 ~SMConnection() { 872 ~SMConnection() {
852 if (initialized()) { 873 if (initialized()) {
853 Reset(); 874 Reset();
854 } 875 }
855 } 876 }
856 static SMConnection* NewSMConnection(EpollServer* epoll_server, 877 static SMConnection* NewSMConnection(EpollServer* epoll_server,
857 SSLState *ssl_state, 878 SSLState *ssl_state,
858 MemoryCache* memory_cache, 879 MemoryCache* memory_cache,
859 FlipAcceptor *acceptor, 880 FlipAcceptor *acceptor,
860 string log_prefix) { 881 string log_prefix) {
861 return new SMConnection(epoll_server, ssl_state, memory_cache, 882 return new SMConnection(epoll_server, ssl_state, memory_cache,
862 acceptor, log_prefix); 883 acceptor, log_prefix);
863 } 884 }
864 885
865 bool initialized() const { return initialized_; } 886 bool initialized() const { return initialized_; }
866 string client_ip() const { return client_ip_; } 887 string client_ip() const { return client_ip_; }
867 888
868 void InitSMConnection(SMConnectionPoolInterface* connection_pool, 889 void InitSMConnection(SMConnectionPoolInterface* connection_pool,
869 SMInterface* sm_interface, 890 SMInterface* sm_interface,
870 EpollServer* epoll_server, 891 EpollServer* epoll_server,
871 int fd, 892 int fd,
872 string server_ip, 893 string server_ip,
873 string server_port, 894 string server_port,
895 string remote_ip,
874 bool use_ssl) { 896 bool use_ssl) {
875 if (initialized_) { 897 if (initialized_) {
876 LOG(FATAL) << "Attempted to initialize already initialized server"; 898 LOG(FATAL) << "Attempted to initialize already initialized server";
877 return; 899 return;
878 } 900 }
879 901
902 client_ip_ = remote_ip;
903
880 if (fd == -1) { 904 if (fd == -1) {
881 // If fd == -1, then we are initializing a new connection that will 905 // If fd == -1, then we are initializing a new connection that will
882 // connect to the backend. 906 // connect to the backend.
883 // 907 //
884 // ret: -1 == error 908 // ret: -1 == error
885 // 0 == connection in progress 909 // 0 == connection in progress
886 // 1 == connection complete 910 // 1 == connection complete
887 // TODO: is_numeric_host_address value needs to be detected 911 // TODO: is_numeric_host_address value needs to be detected
888 server_ip_ = server_ip; 912 server_ip_ = server_ip;
889 server_port_ = server_port; 913 server_port_ = server_port;
(...skipping 24 matching lines...) Expand all
914 epoll_server_->UnregisterFD(fd_); 938 epoll_server_->UnregisterFD(fd_);
915 } 939 }
916 if (fd_ != -1) { 940 if (fd_ != -1) {
917 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 941 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
918 << "Closing pre-existing fd"; 942 << "Closing pre-existing fd";
919 close(fd_); 943 close(fd_);
920 fd_ = -1; 944 fd_ = -1;
921 } 945 }
922 946
923 fd_ = fd; 947 fd_ = fd;
924 struct sockaddr sock_addr;
925 socklen_t addr_size = sizeof(sock_addr);
926 addr_size = sizeof(sock_addr);
927 int res = getsockname(fd_, &sock_addr, &addr_size);
928 if (res < 0) {
929 LOG(ERROR) << "Could not get socket address for fd " << fd_
930 << ": getsockname: " << strerror(errno);
931 } else {
932 struct sockaddr_in *sock_addr_in = (struct sockaddr_in *)&sock_addr;
933 char ip[16];
934 snprintf(ip, sizeof(ip), "%d.%d.%d.%d",
935 IPV4_PRINTABLE_FORMAT(sock_addr_in->sin_addr.s_addr));
936 client_ip_ = ip;
937 }
938 } 948 }
939 949
940 registered_in_epoll_server_ = false; 950 registered_in_epoll_server_ = false;
951 // Set the last read time here as the idle checker will start from
952 // now.
953 last_read_time_ = time(NULL);
941 initialized_ = true; 954 initialized_ = true;
942 955
943 connection_pool_ = connection_pool; 956 connection_pool_ = connection_pool;
944 epoll_server_ = epoll_server; 957 epoll_server_ = epoll_server;
945 958
946 if (sm_interface) { 959 if (sm_interface) {
947 sm_interface_ = sm_interface; 960 sm_interface_ = sm_interface;
948 protocol_detected_ = true; 961 protocol_detected_ = true;
949 } 962 }
950 963
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
984 } 997 }
985 // If we wrote some data, return that count. Otherwise 998 // If we wrote some data, return that count. Otherwise
986 // return the stall error. 999 // return the stall error.
987 return bytes_written > 0 ? bytes_written : rv; 1000 return bytes_written > 0 ? bytes_written : rv;
988 } 1001 }
989 bytes_written += rv; 1002 bytes_written += rv;
990 len -= rv; 1003 len -= rv;
991 if (rv != chunksize) 1004 if (rv != chunksize)
992 break; // If we couldn't write everything, we're implicitly stalled 1005 break; // If we couldn't write everything, we're implicitly stalled
993 } 1006 }
1007 if (!(flags & MSG_MORE)) {
1008 int state = 0;
1009 setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
1010 state = 1;
1011 setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
1012 }
994 } else { 1013 } else {
995 bytes_written = send(fd_, data, len, flags); 1014 bytes_written = send(fd_, data, len, flags);
996 } 1015 }
997 return bytes_written; 1016 return bytes_written;
998 } 1017 }
999 1018
1000 // the following are from the EpollCallbackInterface 1019 // the following are from the EpollCallbackInterface
1001 virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { 1020 virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) {
1002 registered_in_epoll_server_ = true; 1021 registered_in_epoll_server_ = true;
1003 } 1022 }
1004 virtual void OnModification(int fd, int event_mask) { } 1023 virtual void OnModification(int fd, int event_mask) { }
1005 virtual void OnEvent(int fd, EpollEvent* event) { 1024 virtual void OnEvent(int fd, EpollEvent* event) {
1006 events_ |= event->in_events; 1025 events_ |= event->in_events;
1007 HandleEvents(); 1026 HandleEvents();
1008 if (events_) { 1027 if (events_) {
1009 event->out_ready_mask = events_; 1028 event->out_ready_mask = events_;
1010 events_ = 0; 1029 events_ = 0;
1011 } 1030 }
1012 } 1031 }
1013 virtual void OnUnregistration(int fd, bool replaced) { 1032 virtual void OnUnregistration(int fd, bool replaced) {
1014 registered_in_epoll_server_ = false; 1033 registered_in_epoll_server_ = false;
1015 } 1034 }
1016 virtual void OnShutdown(EpollServer* eps, int fd) { 1035 virtual void OnShutdown(EpollServer* eps, int fd) {
1017 Cleanup("OnShutdown"); 1036 Cleanup("OnShutdown");
1018 return; 1037 return;
1019 } 1038 }
1020 1039
1021 void Cleanup(const char* cleanup) { 1040 void Cleanup(const char* cleanup) {
1022 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup"; 1041 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup;
1023 if (!initialized_) { 1042 if (!initialized_) {
1024 return; 1043 return;
1025 } 1044 }
1026 Reset(); 1045 Reset();
1027 if (connection_pool_) { 1046 if (connection_pool_) {
1028 connection_pool_->SMConnectionDone(this); 1047 connection_pool_->SMConnectionDone(this);
1029 } 1048 }
1030 if (sm_interface_) { 1049 if (sm_interface_) {
1031 sm_interface_->ResetForNewConnection(); 1050 sm_interface_->ResetForNewConnection();
1032 } 1051 }
1052 last_read_time_ = 0;
1033 } 1053 }
1034 1054
1035 private: 1055 private:
1036 void HandleEvents() { 1056 void HandleEvents() {
1037 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " 1057 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: "
1038 << EpollServer::EventMaskToString(events_).c_str(); 1058 << EpollServer::EventMaskToString(events_).c_str();
1039 1059
1040 if (events_ & EPOLLIN) { 1060 if (events_ & EPOLLIN) {
1041 if (!DoRead()) 1061 if (!DoRead())
1042 goto handle_close_or_error; 1062 goto handle_close_or_error;
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
1125 continue; 1145 continue;
1126 default: 1146 default:
1127 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 1147 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
1128 << "While calling recv, got error: " 1148 << "While calling recv, got error: "
1129 << (ssl_?"(ssl error)":strerror(stored_errno)); 1149 << (ssl_?"(ssl error)":strerror(stored_errno));
1130 goto error_or_close; 1150 goto error_or_close;
1131 } 1151 }
1132 } else if (bytes_read > 0) { 1152 } else if (bytes_read > 0) {
1133 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read 1153 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read
1134 << " bytes"; 1154 << " bytes";
1155 last_read_time_ = time(NULL);
1135 if (!protocol_detected_) { 1156 if (!protocol_detected_) {
1136 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { 1157 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
1137 // Http Server 1158 // Http Server
1138 protocol_detected_ = true; 1159 protocol_detected_ = true;
1139 if (!sm_http_interface_) { 1160 if (!sm_http_interface_) {
1140 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 1161 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
1141 << "Created HTTP interface."; 1162 << "Created HTTP interface.";
1142 sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_, 1163 sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_,
1143 memory_cache_, acceptor_); 1164 memory_cache_, acceptor_);
1144 } else { 1165 } else {
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
1296 << output_list_.size(); 1317 << output_list_.size();
1297 if (bytes_sent >= max_bytes_sent_per_dowrite_) { 1318 if (bytes_sent >= max_bytes_sent_per_dowrite_) {
1298 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 1319 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
1299 << " byte sent >= max bytes sent per write: Setting EPOLLOUT"; 1320 << " byte sent >= max bytes sent per write: Setting EPOLLOUT";
1300 events_ |= EPOLLOUT; 1321 events_ |= EPOLLOUT;
1301 break; 1322 break;
1302 } 1323 }
1303 if (sm_interface_ && output_list_.size() < 2) { 1324 if (sm_interface_ && output_list_.size() < 2) {
1304 sm_interface_->GetOutput(); 1325 sm_interface_->GetOutput();
1305 } 1326 }
1306 DataFrame& data_frame = output_list_.front(); 1327 DataFrame* data_frame = output_list_.front();
1307 const char* bytes = data_frame.data; 1328 const char* bytes = data_frame->data;
1308 int size = data_frame.size; 1329 int size = data_frame->size;
1309 bytes += data_frame.index; 1330 bytes += data_frame->index;
1310 size -= data_frame.index; 1331 size -= data_frame->index;
1311 DCHECK_GE(size, 0); 1332 DCHECK_GE(size, 0);
1312 if (size <= 0) { 1333 if (size <= 0) {
1313 // Empty data frame. Indicates end of data from client. 1334 // Empty data frame. Indicates end of data from client.
1314 // Uncork the socket. 1335 // Uncork the socket.
1315 int state = 0; 1336 int state = 0;
1316 VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket."; 1337 VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket.";
1317 setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) ); 1338 setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
1318 data_frame.MaybeDelete();
1319 output_list_.pop_front(); 1339 output_list_.pop_front();
1340 delete data_frame;
1320 continue; 1341 continue;
1321 } 1342 }
1322 1343
1323 flags = MSG_NOSIGNAL | MSG_DONTWAIT; 1344 flags = MSG_NOSIGNAL | MSG_DONTWAIT;
1324 if (output_list_.size() > 1) { 1345 if (output_list_.size() > 1) {
1325 VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() 1346 VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size()
1326 << ": Adding MSG_MORE flag"; 1347 << ": Adding MSG_MORE flag";
1327 flags |= MSG_MORE; 1348 flags |= MSG_MORE;
1328 } 1349 }
1329 VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; 1350 VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes.";
(...skipping 12 matching lines...) Expand all
1342 continue; 1363 continue;
1343 default: 1364 default:
1344 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 1365 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
1345 << "While calling send, got error: " << stored_errno 1366 << "While calling send, got error: " << stored_errno
1346 << ": " << (ssl_?"":strerror(stored_errno)); 1367 << ": " << (ssl_?"":strerror(stored_errno));
1347 goto error_or_close; 1368 goto error_or_close;
1348 } 1369 }
1349 } else if (bytes_written > 0) { 1370 } else if (bytes_written > 0) {
1350 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " 1371 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: "
1351 << bytes_written << " bytes"; 1372 << bytes_written << " bytes";
1352 data_frame.index += bytes_written; 1373 data_frame->index += bytes_written;
1353 bytes_sent += bytes_written; 1374 bytes_sent += bytes_written;
1354 continue; 1375 continue;
1355 } else if (bytes_written == -2) { 1376 } else if (bytes_written == -2) {
1356 // -2 handles SSL_ERROR_WANT_* errors 1377 // -2 handles SSL_ERROR_WANT_* errors
1357 events_ &= ~EPOLLOUT; 1378 events_ &= ~EPOLLOUT;
1358 goto done; 1379 goto done;
1359 } 1380 }
1360 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 1381 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
1361 << "0 bytes written with send call."; 1382 << "0 bytes written with send call.";
1362 goto error_or_close; 1383 goto error_or_close;
(...skipping 14 matching lines...) Expand all
1377 return os; 1398 return os;
1378 } 1399 }
1379 1400
1380 void Reset() { 1401 void Reset() {
1381 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; 1402 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting";
1382 if (ssl_) { 1403 if (ssl_) {
1383 SSL_shutdown(ssl_); 1404 SSL_shutdown(ssl_);
1384 PrintSslError(); 1405 PrintSslError();
1385 SSL_free(ssl_); 1406 SSL_free(ssl_);
1386 PrintSslError(); 1407 PrintSslError();
1408 ssl_ = NULL;
1387 } 1409 }
1388 if (registered_in_epoll_server_) { 1410 if (registered_in_epoll_server_) {
1389 epoll_server_->UnregisterFD(fd_); 1411 epoll_server_->UnregisterFD(fd_);
1390 registered_in_epoll_server_ = false; 1412 registered_in_epoll_server_ = false;
1391 } 1413 }
1392 if (fd_ >= 0) { 1414 if (fd_ >= 0) {
1393 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; 1415 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection";
1394 close(fd_); 1416 close(fd_);
1395 fd_ = -1; 1417 fd_ = -1;
1396 } 1418 }
1397 read_buffer_.Clear(); 1419 read_buffer_.Clear();
1398 initialized_ = false; 1420 initialized_ = false;
1399 protocol_detected_ = false; 1421 protocol_detected_ = false;
1400 events_ = 0; 1422 events_ = 0;
1423 for (list<DataFrame*>::iterator i =
1424 output_list_.begin();
1425 i != output_list_.end();
1426 ++i) {
1427 delete *i;
1428 }
1401 output_list_.clear(); 1429 output_list_.clear();
1402 } 1430 }
1403 1431
1404 }; 1432 };
1405 1433
1406 //////////////////////////////////////////////////////////////////////////////// 1434 ////////////////////////////////////////////////////////////////////////////////
1407 1435
1408 class OutputOrdering { 1436 class OutputOrdering {
1409 public: 1437 public:
1410 typedef list<MemCacheIter> PriorityRing; 1438 typedef list<MemCacheIter> PriorityRing;
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after
1632 1660
1633 void InitSMInterface(SMInterface* sm_http_interface, 1661 void InitSMInterface(SMInterface* sm_http_interface,
1634 int32 server_idx) { } 1662 int32 server_idx) { }
1635 1663
1636 void InitSMConnection(SMConnectionPoolInterface* connection_pool, 1664 void InitSMConnection(SMConnectionPoolInterface* connection_pool,
1637 SMInterface* sm_interface, 1665 SMInterface* sm_interface,
1638 EpollServer* epoll_server, 1666 EpollServer* epoll_server,
1639 int fd, 1667 int fd,
1640 string server_ip, 1668 string server_ip,
1641 string server_port, 1669 string server_port,
1670 string remote_ip,
1642 bool use_ssl) { 1671 bool use_ssl) {
1643 VLOG(2) << ACCEPTOR_CLIENT_IDENT 1672 VLOG(2) << ACCEPTOR_CLIENT_IDENT
1644 << "SpdySM: Initializing server connection."; 1673 << "SpdySM: Initializing server connection.";
1645 connection_->InitSMConnection(connection_pool, sm_interface, 1674 connection_->InitSMConnection(connection_pool, sm_interface,
1646 epoll_server, fd, server_ip, server_port, 1675 epoll_server, fd, server_ip, server_port,
1647 use_ssl); 1676 remote_ip, use_ssl);
1648 } 1677 }
1649 1678
1650 private: 1679 private:
1651 virtual void OnError(SpdyFramer* framer) { 1680 virtual void OnError(SpdyFramer* framer) {
1652 /* do nothing with this right now */ 1681 /* do nothing with this right now */
1653 } 1682 }
1654 1683
1655 SMInterface* NewConnectionInterface() { 1684 SMInterface* NewConnectionInterface() {
1656 SMConnection* server_connection = 1685 SMConnection* server_connection =
1657 SMConnection::NewSMConnection(epoll_server_, NULL, 1686 SMConnection::NewSMConnection(epoll_server_, NULL,
(...skipping 25 matching lines...) Expand all
1683 server_idx = unused_server_interface_list.back(); 1712 server_idx = unused_server_interface_list.back();
1684 unused_server_interface_list.pop_back(); 1713 unused_server_interface_list.pop_back();
1685 sm_http_interface = server_interface_list.at(server_idx); 1714 sm_http_interface = server_interface_list.at(server_idx);
1686 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on " 1715 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on "
1687 << "index: " << server_idx; 1716 << "index: " << server_idx;
1688 } 1717 }
1689 1718
1690 sm_http_interface->InitSMInterface(this, server_idx); 1719 sm_http_interface->InitSMInterface(this, server_idx);
1691 sm_http_interface->InitSMConnection(NULL, sm_http_interface, 1720 sm_http_interface->InitSMConnection(NULL, sm_http_interface,
1692 epoll_server_, -1, 1721 epoll_server_, -1,
1693 server_ip, server_port, false); 1722 server_ip, server_port, "", false);
1694 1723
1695 return sm_http_interface; 1724 return sm_http_interface;
1696 } 1725 }
1697 1726
1698 int SpdyHandleNewStream(const SpdyControlFrame* frame, 1727 int SpdyHandleNewStream(const SpdyControlFrame* frame,
1699 string *http_data, 1728 string &http_data,
1700 bool *is_https_scheme) 1729 bool *is_https_scheme)
1701 { 1730 {
1702 bool parsed_headers = false; 1731 bool parsed_headers = false;
1703 SpdyHeaderBlock headers; 1732 SpdyHeaderBlock headers;
1704 const SpdySynStreamControlFrame* syn_stream = 1733 const SpdySynStreamControlFrame* syn_stream =
1705 reinterpret_cast<const SpdySynStreamControlFrame*>(frame); 1734 reinterpret_cast<const SpdySynStreamControlFrame*>(frame);
1706 1735
1707 *is_https_scheme = false; 1736 *is_https_scheme = false;
1708 parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); 1737 parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers);
1709 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" 1738 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn("
(...skipping 26 matching lines...) Expand all
1736 string host = UrlUtilities::GetUrlHost(url->second); 1765 string host = UrlUtilities::GetUrlHost(url->second);
1737 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second 1766 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second
1738 << " " << uri; 1767 << " " << uri;
1739 string filename = EncodeURL(uri, host, method->second); 1768 string filename = EncodeURL(uri, host, method->second);
1740 NewStream(syn_stream->stream_id(), 1769 NewStream(syn_stream->stream_id(),
1741 reinterpret_cast<const SpdySynStreamControlFrame*>(frame)-> 1770 reinterpret_cast<const SpdySynStreamControlFrame*>(frame)->
1742 priority(), 1771 priority(),
1743 filename); 1772 filename);
1744 } else { 1773 } else {
1745 SpdyHeaderBlock::iterator version = headers.find("version"); 1774 SpdyHeaderBlock::iterator version = headers.find("version");
1746 *http_data += method->second + " " + uri + " " + version->second + "\r\n"; 1775 http_data += method->second + " " + uri + " " + version->second + "\r\n";
1747 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " 1776 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " "
1748 << uri << " " << version->second; 1777 << uri << " " << version->second;
1749 for (SpdyHeaderBlock::iterator i = headers.begin(); 1778 for (SpdyHeaderBlock::iterator i = headers.begin();
1750 i != headers.end(); ++i) { 1779 i != headers.end(); ++i) {
1751 *http_data += i->first + ": " + i->second + "\r\n"; 1780 http_data += i->first + ": " + i->second + "\r\n";
1752 VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" 1781 VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":"
1753 << i->second.c_str(); 1782 << i->second.c_str();
1754 } 1783 }
1755 if (g_proxy_config.forward_ip_header_enabled_) { 1784 if (g_proxy_config.forward_ip_header_enabled_) {
1756 // X-Client-Cluster-IP header 1785 // X-Client-Cluster-IP header
1757 *http_data += g_proxy_config.forward_ip_header_ + ": " + 1786 http_data += g_proxy_config.forward_ip_header_ + ": " +
1758 connection_->client_ip() + "\r\n"; 1787 connection_->client_ip() + "\r\n";
1759 } 1788 }
1760 *http_data += "\r\n"; 1789 http_data += "\r\n";
1761 } 1790 }
1762 1791
1763 VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; 1792 VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data;
1764 return 1; 1793 return 1;
1765 } 1794 }
1766 1795
1767 virtual void OnControl(const SpdyControlFrame* frame) { 1796 virtual void OnControl(const SpdyControlFrame* frame) {
1768 SpdyHeaderBlock headers; 1797 SpdyHeaderBlock headers;
1769 bool parsed_headers = false; 1798 bool parsed_headers = false;
1770 switch (frame->type()) { 1799 switch (frame->type()) {
1771 case SYN_STREAM: 1800 case SYN_STREAM:
1772 { 1801 {
1773 const SpdySynStreamControlFrame* syn_stream = 1802 const SpdySynStreamControlFrame* syn_stream =
1774 reinterpret_cast<const SpdySynStreamControlFrame*>(frame); 1803 reinterpret_cast<const SpdySynStreamControlFrame*>(frame);
1775 1804
1776 string http_data; 1805 string http_data;
1777 bool is_https_scheme; 1806 bool is_https_scheme;
1778 int ret = SpdyHandleNewStream(frame, &http_data, &is_https_scheme); 1807 int ret = SpdyHandleNewStream(frame, http_data, &is_https_scheme);
1779 if (!ret) { 1808 if (!ret) {
1780 LOG(ERROR) << "SpdySM: Could not convert spdy into http."; 1809 LOG(ERROR) << "SpdySM: Could not convert spdy into http.";
1781 break; 1810 break;
1782 } 1811 }
1783 1812
1784 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { 1813 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
1785 string server_ip; 1814 string server_ip;
1786 string server_port; 1815 string server_port;
1787 if (is_https_scheme) { 1816 if (is_https_scheme) {
1788 server_ip = acceptor_->https_server_ip_; 1817 server_ip = acceptor_->https_server_ip_;
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1872 next_outgoing_stream_id_ = 2; 1901 next_outgoing_stream_id_ = 2;
1873 } 1902 }
1874 1903
1875 // SMInterface's Cleanup is currently only called by SMConnection after a 1904 // SMInterface's Cleanup is currently only called by SMConnection after a
1876 // protocol message as been fully read. Spdy's SMInterface does not need 1905 // protocol message as been fully read. Spdy's SMInterface does not need
1877 // to do any cleanup at this time. 1906 // to do any cleanup at this time.
1878 // TODO (klindsay) This method is probably not being used properly and 1907 // TODO (klindsay) This method is probably not being used properly and
1879 // some logic review and method renaming is probably in order. 1908 // some logic review and method renaming is probably in order.
1880 void Cleanup() {} 1909 void Cleanup() {}
1881 1910
1882 // Send a settings frame and possibly some NOOP packets to force 1911 // Send a settings frame
1883 // opening of cwnd
1884 int PostAcceptHook() { 1912 int PostAcceptHook() {
1885 ssize_t bytes_written; 1913 ssize_t bytes_written;
1886 spdy::SpdySettings settings; 1914 spdy::SpdySettings settings;
1887 spdy::SettingsFlagsAndId settings_id(0); 1915 spdy::SettingsFlagsAndId settings_id(0);
1888 settings_id.set_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS); 1916 settings_id.set_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS);
1889 settings.push_back(spdy::SpdySetting(settings_id, 100)); 1917 settings.push_back(spdy::SpdySetting(settings_id, 100));
1890 scoped_ptr<SpdySettingsControlFrame> 1918 scoped_ptr<SpdySettingsControlFrame>
1891 settings_frame(spdy_framer_->CreateSettings(settings)); 1919 settings_frame(spdy_framer_->CreateSettings(settings));
1892 1920
1893 char* bytes = settings_frame->data(); 1921 char* bytes = settings_frame->data();
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
2035 string original_url = headers.GetHeader("X-Original-Url").as_string(); 2063 string original_url = headers.GetHeader("X-Original-Url").as_string();
2036 block["path"] = UrlUtilities::GetUrlPath(original_url); 2064 block["path"] = UrlUtilities::GetUrlPath(original_url);
2037 } else { 2065 } else {
2038 block["path"] = headers.request_uri().as_string(); 2066 block["path"] = headers.request_uri().as_string();
2039 } 2067 }
2040 CopyHeaders(block, headers); 2068 CopyHeaders(block, headers);
2041 2069
2042 SpdySynStreamControlFrame* fsrcf = 2070 SpdySynStreamControlFrame* fsrcf =
2043 spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, 2071 spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true,
2044 &block); 2072 &block);
2045 DataFrame df; 2073 SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
2046 df.size = fsrcf->length() + SpdyFrame::size(); 2074 df->size = fsrcf->length() + SpdyFrame::size();
2047 size_t df_size = df.size; 2075 size_t df_size = df->size;
2048 df.data = fsrcf->data(); 2076 df->data = fsrcf->data();
2049 df.delete_when_done = true; 2077 df->frame = fsrcf;
2078 df->delete_when_done = true;
2050 EnqueueDataFrame(df); 2079 EnqueueDataFrame(df);
2051 2080
2052 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " 2081 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader "
2053 << stream_id; 2082 << stream_id;
2054 return df_size; 2083 return df_size;
2055 } 2084 }
2056 2085
2057 size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { 2086 size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
2058 SpdyHeaderBlock block; 2087 SpdyHeaderBlock block;
2059 CopyHeaders(block, headers); 2088 CopyHeaders(block, headers);
2060 block["status"] = headers.response_code().as_string() + " " + 2089 block["status"] = headers.response_code().as_string() + " " +
2061 headers.response_reason_phrase().as_string(); 2090 headers.response_reason_phrase().as_string();
2062 block["version"] = headers.response_version().as_string(); 2091 block["version"] = headers.response_version().as_string();
2063 2092
2064 SpdySynReplyControlFrame* fsrcf = 2093 SpdySynReplyControlFrame* fsrcf =
2065 spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); 2094 spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block);
2066 DataFrame df; 2095 SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
2067 df.size = fsrcf->length() + SpdyFrame::size(); 2096 df->size = fsrcf->length() + SpdyFrame::size();
2068 size_t df_size = df.size; 2097 size_t df_size = df->size;
2069 df.data = fsrcf->data(); 2098 df->data = fsrcf->data();
2070 df.delete_when_done = true; 2099 df->frame = fsrcf;
2100 df->delete_when_done = true;
2071 EnqueueDataFrame(df); 2101 EnqueueDataFrame(df);
2072 2102
2073 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " 2103 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader "
2074 << stream_id; 2104 << stream_id;
2075 return df_size; 2105 return df_size;
2076 } 2106 }
2077 2107
2078 void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, 2108 void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len,
2079 SpdyDataFlags flags, bool compress) { 2109 SpdyDataFlags flags, bool compress) {
2080 // Force compression off if disabled via command line. 2110 // Force compression off if disabled via command line.
2081 if (!FLAGS_use_compression) 2111 if (!FLAGS_use_compression)
2082 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED); 2112 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED);
2083 2113
2084 // TODO(mbelshe): We can't compress here - before going into the 2114 // TODO(mbelshe): We can't compress here - before going into the
2085 // priority queue. Compression needs to be done 2115 // priority queue. Compression needs to be done
2086 // with late binding. 2116 // with late binding.
2087
2088 if (len == 0) { 2117 if (len == 0) {
2089 SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, 2118 SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len,
2090 flags); 2119 flags);
2091 DataFrame df; 2120 SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
2092 df.size = fdf->length() + SpdyFrame::size(); 2121 df->size = fdf->length() + SpdyFrame::size();
2093 df.data = fdf->data(); 2122 df->data = fdf->data();
2094 df.delete_when_done = true; 2123 df->delete_when_done = true;
2095 EnqueueDataFrame(df); 2124 EnqueueDataFrame(df);
2096 return; 2125 return;
2097 } 2126 }
2098 2127
2099 // Chop data frames into chunks so that one stream can't monopolize the 2128 // Chop data frames into chunks so that one stream can't monopolize the
2100 // output channel. 2129 // output channel.
2101 while(len > 0) { 2130 while(len > 0) {
2102 int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize)); 2131 int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize));
2103 SpdyDataFlags chunk_flags = flags; 2132 SpdyDataFlags chunk_flags = flags;
2104 2133
2105 // If we chunked this block, and the FIN flag was set, there is more 2134 // If we chunked this block, and the FIN flag was set, there is more
2106 // data coming. So, remove the flag. 2135 // data coming. So, remove the flag.
2107 if ((size < len) && (flags & DATA_FLAG_FIN)) 2136 if ((size < len) && (flags & DATA_FLAG_FIN))
2108 chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN); 2137 chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN);
2109 2138
2110 SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size, 2139 SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size,
2111 chunk_flags); 2140 chunk_flags);
2112 DataFrame df; 2141 SpdyFrameDataFrame* df = new SpdyFrameDataFrame;
2113 df.size = fdf->length() + SpdyFrame::size(); 2142 df->size = fdf->length() + SpdyFrame::size();
2114 df.data = fdf->data(); 2143 df->data = fdf->data();
2115 df.delete_when_done = true; 2144 df->delete_when_done = true;
2116 EnqueueDataFrame(df); 2145 EnqueueDataFrame(df);
2117 2146
2118 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " 2147 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame "
2119 << stream_id << " [" << size << "] shrunk to " << fdf->length() 2148 << stream_id << " [" << size << "] shrunk to " << fdf->length()
2120 << ", flags=" << flags; 2149 << ", flags=" << flags;
2121 2150
2122 data += size; 2151 data += size;
2123 len -= size; 2152 len -= size;
2124 } 2153 }
2125 } 2154 }
2126 2155
2127 void EnqueueDataFrame(const DataFrame& df) { 2156 void EnqueueDataFrame(DataFrame* df) {
2128 connection_->EnqueueDataFrame(df); 2157 connection_->EnqueueDataFrame(df);
2129 } 2158 }
2130 2159
2131 void GetOutput() { 2160 void GetOutput() {
2132 while (client_output_list_->size() < 2) { 2161 while (client_output_list_->size() < 2) {
2133 MemCacheIter* mci = client_output_ordering_.GetIter(); 2162 MemCacheIter* mci = client_output_ordering_.GetIter();
2134 if (mci == NULL) { 2163 if (mci == NULL) {
2135 VLOG(2) << ACCEPTOR_CLIENT_IDENT 2164 VLOG(2) << ACCEPTOR_CLIENT_IDENT
2136 << "SpdySM: GetOutput: nothing to output!?"; 2165 << "SpdySM: GetOutput: nothing to output!?";
2137 return; 2166 return;
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
2316 sm_spdy_interface_ = sm_spdy_interface; 2345 sm_spdy_interface_ = sm_spdy_interface;
2317 server_idx_ = server_idx; 2346 server_idx_ = server_idx;
2318 } 2347 }
2319 2348
2320 void InitSMConnection(SMConnectionPoolInterface* connection_pool, 2349 void InitSMConnection(SMConnectionPoolInterface* connection_pool,
2321 SMInterface* sm_interface, 2350 SMInterface* sm_interface,
2322 EpollServer* epoll_server, 2351 EpollServer* epoll_server,
2323 int fd, 2352 int fd,
2324 string server_ip, 2353 string server_ip,
2325 string server_port, 2354 string server_port,
2355 string remote_ip,
2326 bool use_ssl) 2356 bool use_ssl)
2327 { 2357 {
2328 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " 2358 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server "
2329 << "connection."; 2359 << "connection.";
2330 connection_->InitSMConnection(connection_pool, sm_interface, 2360 connection_->InitSMConnection(connection_pool, sm_interface,
2331 epoll_server, fd, server_ip, server_port, 2361 epoll_server, fd, server_ip, server_port,
2332 use_ssl); 2362 remote_ip, use_ssl);
2333 } 2363 }
2334 2364
2335 size_t ProcessReadInput(const char* data, size_t len) { 2365 size_t ProcessReadInput(const char* data, size_t len) {
2336 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream " 2366 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream "
2337 << stream_id_; 2367 << stream_id_;
2338 return http_framer_->ProcessInput(data, len); 2368 return http_framer_->ProcessInput(data, len);
2339 } 2369 }
2340 2370
2341 size_t ProcessWriteInput(const char* data, size_t len) { 2371 size_t ProcessWriteInput(const char* data, size_t len) {
2342 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size " 2372 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size "
2343 << len << ": stream " << stream_id_; 2373 << len << ": stream " << stream_id_;
2344 char * dataPtr = new char[len]; 2374 char * dataPtr = new char[len];
2345 memcpy(dataPtr, data, len); 2375 memcpy(dataPtr, data, len);
2346 DataFrame data_frame; 2376 DataFrame* data_frame = new DataFrame;
2347 data_frame.data = (const char *)dataPtr; 2377 data_frame->data = (const char *)dataPtr;
2348 data_frame.size = len; 2378 data_frame->size = len;
2349 data_frame.delete_when_done = true; 2379 data_frame->delete_when_done = true;
2350 connection_->EnqueueDataFrame(data_frame); 2380 connection_->EnqueueDataFrame(data_frame);
2351 return len; 2381 return len;
2352 } 2382 }
2353 2383
2354 bool MessageFullyRead() const { 2384 bool MessageFullyRead() const {
2355 return http_framer_->MessageFullyRead(); 2385 return http_framer_->MessageFullyRead();
2356 } 2386 }
2357 2387
2358 void SetStreamID(uint32 stream_id) { 2388 void SetStreamID(uint32 stream_id) {
2359 stream_id_ = stream_id; 2389 stream_id_ = stream_id;
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
2443 2473
2444 void SendDataFrame(uint32 stream_id, const char* data, int64 len, 2474 void SendDataFrame(uint32 stream_id, const char* data, int64 len,
2445 uint32 flags, bool compress) { 2475 uint32 flags, bool compress) {
2446 SendDataFrameImpl(stream_id, data, len, flags, compress); 2476 SendDataFrameImpl(stream_id, data, len, flags, compress);
2447 } 2477 }
2448 2478
2449 BalsaFrame* spdy_framer() { return http_framer_; } 2479 BalsaFrame* spdy_framer() { return http_framer_; }
2450 2480
2451 private: 2481 private:
2452 void SendEOFImpl(uint32 stream_id) { 2482 void SendEOFImpl(uint32 stream_id) {
2453 DataFrame df; 2483 DataFrame* df = new DataFrame;
2454 df.data = "0\r\n\r\n"; 2484 df->data = "0\r\n\r\n";
2455 df.size = 5; 2485 df->size = 5;
2456 df.delete_when_done = false; 2486 df->delete_when_done = false;
2457 EnqueueDataFrame(df); 2487 EnqueueDataFrame(df);
2458 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { 2488 if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
2459 Reset(); 2489 Reset();
2460 } 2490 }
2461 } 2491 }
2462 2492
2463 void SendErrorNotFoundImpl(uint32 stream_id) { 2493 void SendErrorNotFoundImpl(uint32 stream_id) {
2464 BalsaHeaders my_headers; 2494 BalsaHeaders my_headers;
2465 my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); 2495 my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found");
2466 my_headers.RemoveAllOfHeader("content-length"); 2496 my_headers.RemoveAllOfHeader("content-length");
(...skipping 11 matching lines...) Expand all
2478 my_headers.AppendHeader("transfer-encoding", "chunked"); 2508 my_headers.AppendHeader("transfer-encoding", "chunked");
2479 SendSynReplyImpl(stream_id, my_headers); 2509 SendSynReplyImpl(stream_id, my_headers);
2480 SendDataFrame(stream_id, output->c_str(), output->size(), 0, false); 2510 SendDataFrame(stream_id, output->c_str(), output->size(), 0, false);
2481 SendEOFImpl(stream_id); 2511 SendEOFImpl(stream_id);
2482 output_ordering_.RemoveStreamId(stream_id); 2512 output_ordering_.RemoveStreamId(stream_id);
2483 } 2513 }
2484 2514
2485 size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { 2515 size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
2486 SimpleBuffer sb; 2516 SimpleBuffer sb;
2487 headers.WriteHeaderAndEndingToBuffer(&sb); 2517 headers.WriteHeaderAndEndingToBuffer(&sb);
2488 DataFrame df; 2518 DataFrame* df = new DataFrame;
2489 df.size = sb.ReadableBytes(); 2519 df->size = sb.ReadableBytes();
2490 char* buffer = new char[df.size]; 2520 char* buffer = new char[df->size];
2491 df.data = buffer; 2521 df->data = buffer;
2492 df.delete_when_done = true; 2522 df->delete_when_done = true;
2493 sb.Read(buffer, df.size); 2523 sb.Read(buffer, df->size);
2494 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " 2524 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
2495 << stream_id_; 2525 << stream_id_;
2496 size_t df_size = df.size; 2526 size_t df_size = df->size;
2497 EnqueueDataFrame(df); 2527 EnqueueDataFrame(df);
2498 return df_size; 2528 return df_size;
2499 } 2529 }
2500 2530
2501 size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { 2531 size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) {
2502 SimpleBuffer sb; 2532 SimpleBuffer sb;
2503 headers.WriteHeaderAndEndingToBuffer(&sb); 2533 headers.WriteHeaderAndEndingToBuffer(&sb);
2504 DataFrame df; 2534 DataFrame* df = new DataFrame;
2505 df.size = sb.ReadableBytes(); 2535 df->size = sb.ReadableBytes();
2506 char* buffer = new char[df.size]; 2536 char* buffer = new char[df->size];
2507 df.data = buffer; 2537 df->data = buffer;
2508 df.delete_when_done = true; 2538 df->delete_when_done = true;
2509 sb.Read(buffer, df.size); 2539 sb.Read(buffer, df->size);
2510 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " 2540 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
2511 << stream_id_; 2541 << stream_id_;
2512 size_t df_size = df.size; 2542 size_t df_size = df->size;
2513 EnqueueDataFrame(df); 2543 EnqueueDataFrame(df);
2514 return df_size; 2544 return df_size;
2515 } 2545 }
2516 2546
2517 void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, 2547 void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len,
2518 uint32 flags, bool compress) { 2548 uint32 flags, bool compress) {
2519 char chunk_buf[128]; 2549 char chunk_buf[128];
2520 snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len); 2550 snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len);
2521 string chunk_description(chunk_buf); 2551 string chunk_description(chunk_buf);
2522 DataFrame df; 2552 DataFrame* df = new DataFrame;
2523 df.size = chunk_description.size() + len + 2; 2553 df->size = chunk_description.size() + len + 2;
2524 char* buffer = new char[df.size]; 2554 char* buffer = new char[df->size];
2525 df.data = buffer; 2555 df->data = buffer;
2526 df.delete_when_done = true; 2556 df->delete_when_done = true;
2527 memcpy(buffer, chunk_description.data(), chunk_description.size()); 2557 memcpy(buffer, chunk_description.data(), chunk_description.size());
2528 memcpy(buffer + chunk_description.size(), data, len); 2558 memcpy(buffer + chunk_description.size(), data, len);
2529 memcpy(buffer + chunk_description.size() + len, "\r\n", 2); 2559 memcpy(buffer + chunk_description.size() + len, "\r\n", 2);
2530 EnqueueDataFrame(df); 2560 EnqueueDataFrame(df);
2531 } 2561 }
2532 2562
2533 void EnqueueDataFrame(const DataFrame& df) { 2563 void EnqueueDataFrame(DataFrame* df) {
2534 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " 2564 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream "
2535 << stream_id_; 2565 << stream_id_;
2536 connection_->EnqueueDataFrame(df); 2566 connection_->EnqueueDataFrame(df);
2537 } 2567 }
2538 2568
2539 void GetOutput() { 2569 void GetOutput() {
2540 MemCacheIter* mci = output_ordering_.GetIter(); 2570 MemCacheIter* mci = output_ordering_.GetIter();
2541 if (mci == NULL) { 2571 if (mci == NULL) {
2542 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to " 2572 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to "
2543 << "output!?: stream " << stream_id_; 2573 << "output!?: stream " << stream_id_;
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
2603 { 2633 {
2604 sm_other_interface_ = sm_other_interface; 2634 sm_other_interface_ = sm_other_interface;
2605 } 2635 }
2606 2636
2607 void InitSMConnection(SMConnectionPoolInterface* connection_pool, 2637 void InitSMConnection(SMConnectionPoolInterface* connection_pool,
2608 SMInterface* sm_interface, 2638 SMInterface* sm_interface,
2609 EpollServer* epoll_server, 2639 EpollServer* epoll_server,
2610 int fd, 2640 int fd,
2611 string server_ip, 2641 string server_ip,
2612 string server_port, 2642 string server_port,
2643 string remote_ip,
2613 bool use_ssl) 2644 bool use_ssl)
2614 { 2645 {
2615 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " 2646 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server "
2616 << "connection."; 2647 << "connection.";
2617 connection_->InitSMConnection(connection_pool, sm_interface, 2648 connection_->InitSMConnection(connection_pool, sm_interface,
2618 epoll_server, fd, server_ip, 2649 epoll_server, fd, server_ip,
2619 server_port, use_ssl); 2650 server_port, remote_ip, use_ssl);
2620 } 2651 }
2621 2652
2622 size_t ProcessReadInput(const char* data, size_t len) { 2653 size_t ProcessReadInput(const char* data, size_t len) {
2623 return sm_other_interface_->ProcessWriteInput(data, len); 2654 return sm_other_interface_->ProcessWriteInput(data, len);
2624 } 2655 }
2625 2656
2626 size_t ProcessWriteInput(const char* data, size_t len) { 2657 size_t ProcessWriteInput(const char* data, size_t len) {
2627 char * dataPtr = new char[len]; 2658 char * dataPtr = new char[len];
2628 memcpy(dataPtr, data, len); 2659 memcpy(dataPtr, data, len);
2629 DataFrame df; 2660 DataFrame* df = new DataFrame;
2630 df.data = (const char *)dataPtr; 2661 df->data = (const char *)dataPtr;
2631 df.size = len; 2662 df->size = len;
2632 df.delete_when_done = true; 2663 df->delete_when_done = true;
2633 connection_->EnqueueDataFrame(df); 2664 connection_->EnqueueDataFrame(df);
2634 return len; 2665 return len;
2635 } 2666 }
2636 2667
2637 bool MessageFullyRead() const { 2668 bool MessageFullyRead() const {
2638 return false; 2669 return false;
2639 } 2670 }
2640 2671
2641 void SetStreamID(uint32 stream_id) {} 2672 void SetStreamID(uint32 stream_id) {}
2642 2673
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
2677 sm_other_interface_ = new StreamerSM(server_connection, this, 2708 sm_other_interface_ = new StreamerSM(server_connection, this,
2678 epoll_server_, acceptor_); 2709 epoll_server_, acceptor_);
2679 sm_other_interface_->InitSMInterface(this, 0); 2710 sm_other_interface_->InitSMInterface(this, 0);
2680 } 2711 }
2681 // The Streamer interface is used to stream HTTPS connections, so we 2712 // The Streamer interface is used to stream HTTPS connections, so we
2682 // will always use the https_server_ip/port here. 2713 // will always use the https_server_ip/port here.
2683 sm_other_interface_->InitSMConnection(NULL, sm_other_interface_, 2714 sm_other_interface_->InitSMConnection(NULL, sm_other_interface_,
2684 epoll_server_, -1, 2715 epoll_server_, -1,
2685 acceptor_->https_server_ip_, 2716 acceptor_->https_server_ip_,
2686 acceptor_->https_server_port_, 2717 acceptor_->https_server_port_,
2718 "",
2687 false); 2719 false);
2688 2720
2689 return 1; 2721 return 1;
2690 } 2722 }
2691 2723
2692 void NewStream(uint32 stream_id, uint32 priority, const string& filename) { 2724 void NewStream(uint32 stream_id, uint32 priority, const string& filename) {
2693 } 2725 }
2694 2726
2695 void AddToOutputOrder(const MemCacheIter& mci) { 2727 void AddToOutputOrder(const MemCacheIter& mci) {
2696 } 2728 }
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
2766 public EpollCallbackInterface, 2798 public EpollCallbackInterface,
2767 public SMConnectionPoolInterface { 2799 public SMConnectionPoolInterface {
2768 EpollServer epoll_server_; 2800 EpollServer epoll_server_;
2769 FlipAcceptor *acceptor_; 2801 FlipAcceptor *acceptor_;
2770 SSLState *ssl_state_; 2802 SSLState *ssl_state_;
2771 bool use_ssl_; 2803 bool use_ssl_;
2772 2804
2773 vector<SMConnection*> unused_server_connections_; 2805 vector<SMConnection*> unused_server_connections_;
2774 vector<SMConnection*> tmp_unused_server_connections_; 2806 vector<SMConnection*> tmp_unused_server_connections_;
2775 vector<SMConnection*> allocated_server_connections_; 2807 vector<SMConnection*> allocated_server_connections_;
2808 list<SMConnection*> active_server_connections_;
2776 Notification quitting_; 2809 Notification quitting_;
2777 MemoryCache* memory_cache_; 2810 MemoryCache* memory_cache_;
2778 public: 2811 public:
2779 2812
2780 SMAcceptorThread(FlipAcceptor *acceptor, 2813 SMAcceptorThread(FlipAcceptor *acceptor,
2781 MemoryCache* memory_cache) : 2814 MemoryCache* memory_cache) :
2782 SimpleThread("SMAcceptorThread"), 2815 SimpleThread("SMAcceptorThread"),
2783 acceptor_(acceptor), 2816 acceptor_(acceptor),
2784 ssl_state_(NULL), 2817 ssl_state_(NULL),
2785 use_ssl_(false), 2818 use_ssl_(false),
(...skipping 13 matching lines...) Expand all
2799 } 2832 }
2800 } 2833 }
2801 2834
2802 ~SMAcceptorThread() { 2835 ~SMAcceptorThread() {
2803 for (vector<SMConnection*>::iterator i = 2836 for (vector<SMConnection*>::iterator i =
2804 allocated_server_connections_.begin(); 2837 allocated_server_connections_.begin();
2805 i != allocated_server_connections_.end(); 2838 i != allocated_server_connections_.end();
2806 ++i) { 2839 ++i) {
2807 delete *i; 2840 delete *i;
2808 } 2841 }
2842 delete ssl_state_;
2809 } 2843 }
2810 2844
2811 SMConnection* NewConnection() { 2845 SMConnection* NewConnection() {
2812 SMConnection* server = 2846 SMConnection* server =
2813 SMConnection::NewSMConnection(&epoll_server_, ssl_state_, 2847 SMConnection::NewSMConnection(&epoll_server_, ssl_state_,
2814 memory_cache_, acceptor_, 2848 memory_cache_, acceptor_,
2815 "client_conn: "); 2849 "client_conn: ");
2816 allocated_server_connections_.push_back(server); 2850 allocated_server_connections_.push_back(server);
2817 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server."; 2851 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server.";
2818 return server; 2852 return server;
2819 } 2853 }
2820 2854
2821 SMConnection* FindOrMakeNewSMConnection() { 2855 SMConnection* FindOrMakeNewSMConnection() {
2822 if (unused_server_connections_.empty()) { 2856 if (unused_server_connections_.empty()) {
2823 return NewConnection(); 2857 return NewConnection();
2824 } 2858 }
2825 SMConnection* server = unused_server_connections_.back(); 2859 SMConnection* server = unused_server_connections_.back();
2826 unused_server_connections_.pop_back(); 2860 unused_server_connections_.pop_back();
2827 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server."; 2861 VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server.";
2828 return server; 2862 return server;
2829 } 2863 }
2830 2864
2831 void InitWorker() { 2865 void InitWorker() {
2832 epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); 2866 epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET);
2833 } 2867 }
2834 2868
2835 void HandleConnection(int server_fd) { 2869 void HandleConnection(int server_fd, struct sockaddr_in *remote_addr) {
2836 int on = 1; 2870 int on = 1;
2837 int rc; 2871 int rc;
2838 if (acceptor_->disable_nagle_) { 2872 if (acceptor_->disable_nagle_) {
2839 rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, 2873 rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY,
2840 reinterpret_cast<char*>(&on), sizeof(on)); 2874 reinterpret_cast<char*>(&on), sizeof(on));
2841 if (rc < 0) { 2875 if (rc < 0) {
2842 close(server_fd); 2876 close(server_fd);
2843 LOG(ERROR) << "setsockopt() failed fd=" + server_fd; 2877 LOG(ERROR) << "setsockopt() failed fd=" + server_fd;
2844 return; 2878 return;
2845 } 2879 }
2846 } 2880 }
2847 2881
2848 SMConnection* server_connection = FindOrMakeNewSMConnection(); 2882 SMConnection* server_connection = FindOrMakeNewSMConnection();
2849 if (server_connection == NULL) { 2883 if (server_connection == NULL) {
2850 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd; 2884 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd;
2851 close(server_fd); 2885 close(server_fd);
2852 return; 2886 return;
2853 } 2887 }
2888 string remote_ip = inet_ntoa(remote_addr->sin_addr);
2854 server_connection->InitSMConnection(this, 2889 server_connection->InitSMConnection(this,
2855 NULL, 2890 NULL,
2856 &epoll_server_, 2891 &epoll_server_,
2857 server_fd, 2892 server_fd,
2858 "", "", 2893 "", "", remote_ip,
2859 use_ssl_); 2894 use_ssl_);
2895 if (server_connection->initialized())
2896 active_server_connections_.push_back(server_connection);
2860 } 2897 }
2861 2898
2862 void AcceptFromListenFD() { 2899 void AcceptFromListenFD() {
2863 if (acceptor_->accepts_per_wake_ > 0) { 2900 if (acceptor_->accepts_per_wake_ > 0) {
2864 for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) { 2901 for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) {
2865 struct sockaddr address; 2902 struct sockaddr address;
2866 socklen_t socklen = sizeof(address); 2903 socklen_t socklen = sizeof(address);
2867 int fd = accept(acceptor_->listen_fd_, &address, &socklen); 2904 int fd = accept(acceptor_->listen_fd_, &address, &socklen);
2868 if (fd == -1) { 2905 if (fd == -1) {
2869 if (errno != 11) { 2906 if (errno != 11) {
2870 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" 2907 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail("
2871 << acceptor_->listen_fd_ << "): " << errno << ": " 2908 << acceptor_->listen_fd_ << "): " << errno << ": "
2872 << strerror(errno); 2909 << strerror(errno);
2873 } 2910 }
2874 break; 2911 break;
2875 } 2912 }
2876 VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; 2913 VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection";
2877 HandleConnection(fd); 2914 HandleConnection(fd, (struct sockaddr_in *)&address);
2878 } 2915 }
2879 } else { 2916 } else {
2880 while (true) { 2917 while (true) {
2881 struct sockaddr address; 2918 struct sockaddr address;
2882 socklen_t socklen = sizeof(address); 2919 socklen_t socklen = sizeof(address);
2883 int fd = accept(acceptor_->listen_fd_, &address, &socklen); 2920 int fd = accept(acceptor_->listen_fd_, &address, &socklen);
2884 if (fd == -1) { 2921 if (fd == -1) {
2885 if (errno != 11) { 2922 if (errno != 11) {
2886 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" 2923 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail("
2887 << acceptor_->listen_fd_ << "): " << errno << ": " 2924 << acceptor_->listen_fd_ << "): " << errno << ": "
2888 << strerror(errno); 2925 << strerror(errno);
2889 } 2926 }
2890 break; 2927 break;
2891 } 2928 }
2892 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; 2929 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection";
2893 HandleConnection(fd); 2930 HandleConnection(fd, (struct sockaddr_in *)&address);
2894 } 2931 }
2895 } 2932 }
2896 } 2933 }
2897 2934
2898 // EpollCallbackInteface virtual functions. 2935 // EpollCallbackInteface virtual functions.
2899 virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { } 2936 virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { }
2900 virtual void OnModification(int fd, int event_mask) { } 2937 virtual void OnModification(int fd, int event_mask) { }
2901 virtual void OnEvent(int fd, EpollEvent* event) { 2938 virtual void OnEvent(int fd, EpollEvent* event) {
2902 if (event->in_events | EPOLLIN) { 2939 if (event->in_events | EPOLLIN) {
2903 VLOG(2) << ACCEPTOR_CLIENT_IDENT 2940 VLOG(2) << ACCEPTOR_CLIENT_IDENT
2904 << "Acceptor: Accepting based upon epoll events"; 2941 << "Acceptor: Accepting based upon epoll events";
2905 AcceptFromListenFD(); 2942 AcceptFromListenFD();
2906 } 2943 }
2907 } 2944 }
2908 virtual void OnUnregistration(int fd, bool replaced) { } 2945 virtual void OnUnregistration(int fd, bool replaced) { }
2909 virtual void OnShutdown(EpollServer* eps, int fd) { } 2946 virtual void OnShutdown(EpollServer* eps, int fd) { }
2910 2947
2911 void Quit() { 2948 void Quit() {
2912 quitting_.Notify(); 2949 quitting_.Notify();
2913 } 2950 }
2914 2951
2952 // Iterates through a list of active connections expiring any that have been
2953 // idle longer than the configured timeout.
2954 void HandleConnectionIdleTimeout() {
2955 int cur_time = time(NULL);
2956 static time_t oldest_time = cur_time;
2957 // Only iterate the list if we speculate that a connection is ready to be
2958 // expired
2959 if ((cur_time - oldest_time) < g_proxy_config.idle_timeout_s_)
2960 return;
2961 list<SMConnection*>::iterator iter = active_server_connections_.begin();
2962 while (iter != active_server_connections_.end()) {
2963 SMConnection *conn = *iter;
2964 int elapsed_time = (cur_time - conn->last_read_time_);
2965 if (elapsed_time > g_proxy_config.idle_timeout_s_) {
2966 conn->Cleanup("Connection idle timeout reached.");
2967 iter = active_server_connections_.erase(iter);
2968 continue;
2969 }
2970 if (conn->last_read_time_ < oldest_time)
2971 oldest_time = conn->last_read_time_;
2972 iter++;
2973 }
2974 if ((cur_time - oldest_time) >= g_proxy_config.idle_timeout_s_)
2975 oldest_time = cur_time;
2976 }
2977
2915 void Run() { 2978 void Run() {
2916 while (!quitting_.HasBeenNotified()) { 2979 while (!quitting_.HasBeenNotified()) {
2917 epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms 2980 epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms
2918 epoll_server_.WaitForEventsAndExecuteCallbacks(); 2981 epoll_server_.WaitForEventsAndExecuteCallbacks();
2919 unused_server_connections_.insert(unused_server_connections_.end(), 2982 unused_server_connections_.insert(unused_server_connections_.end(),
2920 tmp_unused_server_connections_.begin(), 2983 tmp_unused_server_connections_.begin(),
2921 tmp_unused_server_connections_.end()); 2984 tmp_unused_server_connections_.end());
2922 tmp_unused_server_connections_.clear(); 2985 tmp_unused_server_connections_.clear();
2986 HandleConnectionIdleTimeout();
2923 } 2987 }
2924 } 2988 }
2925 2989
2926 // SMConnections will use this: 2990 // SMConnections will use this:
2927 virtual void SMConnectionDone(SMConnection* sc) { 2991 virtual void SMConnectionDone(SMConnection* sc) {
2928 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection."; 2992 VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection.";
2929 tmp_unused_server_connections_.push_back(sc); 2993 tmp_unused_server_connections_.push_back(sc);
2930 } 2994 }
2931 }; 2995 };
2932 2996
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
2993 } 3057 }
2994 3058
2995 const char* BoolToStr(bool b) { 3059 const char* BoolToStr(bool b) {
2996 if (b) 3060 if (b)
2997 return "true"; 3061 return "true";
2998 return "false"; 3062 return "false";
2999 } 3063 }
3000 3064
3001 //////////////////////////////////////////////////////////////////////////////// 3065 ////////////////////////////////////////////////////////////////////////////////
3002 3066
3067 static bool wantExit = false;
3068 static bool wantLogClose = false;
3069 void SignalHandler(int signum)
3070 {
3071 switch(signum) {
3072 case SIGTERM:
3073 case SIGINT:
3074 wantExit = true;
3075 break;
3076 case SIGHUP:
3077 wantLogClose = true;
3078 break;
3079 }
3080 }
3081
3082 static int OpenPidFile(const char *pidfile)
3083 {
3084 int fd;
3085 struct stat pid_stat;
3086 int ret;
3087
3088 fd = open(pidfile, O_RDWR | O_CREAT, 0600);
3089 if (fd == -1) {
3090 cerr << "Could not open pid file '" << pidfile << "' for reading.\n";
3091 exit(1);
3092 }
3093
3094 ret = flock(fd, LOCK_EX | LOCK_NB);
3095 if (ret == -1) {
3096 if (errno == EWOULDBLOCK) {
3097 cerr << "Flip server is already running.\n";
3098 } else {
3099 cerr << "Error getting lock on pid file: " << strerror(errno) << "\n";
3100 }
3101 exit(1);
3102 }
3103
3104 if (fstat(fd, &pid_stat) == -1) {
3105 cerr << "Could not stat pid file '" << pidfile << "': " << strerror(errno)
3106 << "\n";
3107 }
3108 if (pid_stat.st_size != 0) {
3109 if (ftruncate(fd, pid_stat.st_size) == -1) {
3110 cerr << "Could not truncate pid file '" << pidfile << "': "
3111 << strerror(errno) << "\n";
3112 }
3113 }
3114
3115 char pid_str[8];
3116 snprintf(pid_str, sizeof(pid_str), "%d", getpid());
3117 int bytes = static_cast<int>(strlen(pid_str));
3118 if (write(fd, pid_str, strlen(pid_str)) != bytes) {
3119 cerr << "Could not write pid file: " << strerror(errno) << "\n";
3120 close(fd);
3121 exit(1);
3122 }
3123
3124 return fd;
3125 }
3126
3003 int main (int argc, char**argv) 3127 int main (int argc, char**argv)
3004 { 3128 {
3005 unsigned int i = 0; 3129 unsigned int i = 0;
3006 bool wait_for_iface = false; 3130 bool wait_for_iface = false;
3131 int pidfile_fd;
3007 3132
3008 signal(SIGPIPE, SIG_IGN); 3133 signal(SIGPIPE, SIG_IGN);
3134 signal(SIGTERM, SignalHandler);
3135 signal(SIGINT, SignalHandler);
3136 signal(SIGHUP, SignalHandler);
3009 3137
3010 CommandLine::Init(argc, argv); 3138 CommandLine::Init(argc, argv);
3011 CommandLine cl(argc, argv); 3139 CommandLine cl(argc, argv);
3012 3140
3013 if (cl.HasSwitch("help") || argc < 2) { 3141 if (cl.HasSwitch("help") || argc < 2) {
3014 cout << argv[0] << " <options>\n"; 3142 cout << argv[0] << " <options>\n";
3015 cout << " Proxy options:\n"; 3143 cout << " Proxy options:\n";
3016 cout << "\t--proxy<1..n>=\"<listen ip>,<listen port>," 3144 cout << "\t--proxy<1..n>=\"<listen ip>,<listen port>,"
3017 << "<ssl cert filename>,\n" 3145 << "<ssl cert filename>,\n"
3018 << "\t <ssl key filename>,<http server ip>," 3146 << "\t <ssl key filename>,<http server ip>,"
3019 << "<http server port>,\n" 3147 << "<http server port>,\n"
3020 << "\t [https server ip],[https server port]," 3148 << "\t [https server ip],[https server port],"
3021 << "<spdy only 0|1>\"\n"; 3149 << "<spdy only 0|1>\"\n";
3022 cout << "\t * The https server ip and port may be left empty if they are" 3150 cout << "\t * The https server ip and port may be left empty if they are"
3023 << " the same as\n" 3151 << " the same as\n"
3024 << "\t the http server fields.\n"; 3152 << "\t the http server fields.\n";
3025 cout << "\t * spdy only prevents non-spdy https connections from being" 3153 cout << "\t * spdy only prevents non-spdy https connections from being"
3026 << " passed\n" 3154 << " passed\n"
3027 << "\t through the proxy listen ip:port.\n"; 3155 << "\t through the proxy listen ip:port.\n";
3028 cout << "\t--forward-ip-header=<header name>\n"; 3156 cout << "\t--forward-ip-header=<header name>\n";
3029 cout << "\n Server options:\n"; 3157 cout << "\n Server options:\n";
3030 cout << "\t--spdy-server=\"<listen ip>,<listen port>,[ssl cert filename],\n" 3158 cout << "\t--spdy-server=\"<listen ip>,<listen port>,[ssl cert filename],"
3031 << "\t [ssl key filename]\"\n"; 3159 << "\n\t [ssl key filename]\"\n";
3032 cout << "\t--http-server=\"<listen ip>,<listen port>,[ssl cert filename],\n" 3160 cout << "\t--http-server=\"<listen ip>,<listen port>,[ssl cert filename],"
3033 << "\t [ssl key filename]\"\n"; 3161 << "\n\t [ssl key filename]\"\n";
3034 cout << "\t * Leaving the ssl cert and key fields empty will disable ssl" 3162 cout << "\t * Leaving the ssl cert and key fields empty will disable ssl"
3035 << " for the\n" 3163 << " for the\n"
3036 << "\t http and spdy flip servers\n"; 3164 << "\t http and spdy flip servers\n";
3037 cout << "\n Global options:\n"; 3165 cout << "\n Global options:\n";
3038 cout << "\t--logdest=<file|system|both>\n"; 3166 cout << "\t--logdest=<file|system|both>\n";
3039 cout << "\t--logfile=<logfile>\n"; 3167 cout << "\t--logfile=<logfile>\n";
3040 cout << "\t--wait-for-iface\n"; 3168 cout << "\t--wait-for-iface\n";
3041 cout << "\t * The flip server will block until the listen ip has been" 3169 cout << "\t * The flip server will block until the listen ip has been"
3042 << " raised.\n"; 3170 << " raised.\n";
3043 cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n"; 3171 cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n";
3044 cout << "\t--ssl-disable-compression\n"; 3172 cout << "\t--ssl-disable-compression\n";
3173 cout << "\t--idle-timeout=<seconds> (default is 300)\n";
3174 cout << "\t--pidfile=<filepath> (default /var/run/flip-server.pid)\n";
3045 cout << "\t--help\n"; 3175 cout << "\t--help\n";
3046 exit(0); 3176 exit(0);
3047 } 3177 }
3048 3178
3179 if (cl.HasSwitch("pidfile")) {
3180 pidfile_fd = OpenPidFile(cl.GetSwitchValueASCII("pidfile").c_str());
3181 } else {
3182 pidfile_fd = OpenPidFile(PIDFILE);
3183 }
3184
3049 g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s; 3185 g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s;
3050 3186
3051 if (cl.HasSwitch("forward-ip-header")) { 3187 if (cl.HasSwitch("forward-ip-header")) {
3052 g_proxy_config.forward_ip_header_enabled_ = true; 3188 g_proxy_config.forward_ip_header_enabled_ = true;
3053 g_proxy_config.forward_ip_header_ = 3189 g_proxy_config.forward_ip_header_ =
3054 cl.GetSwitchValueASCII("forward-ip-header"); 3190 cl.GetSwitchValueASCII("forward-ip-header");
3055 } else { 3191 } else {
3056 g_proxy_config.forward_ip_header_enabled_ = false; 3192 g_proxy_config.forward_ip_header_enabled_ = false;
3057 } 3193 }
3058 3194
(...skipping 23 matching lines...) Expand all
3082 logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG) { 3218 logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG) {
3083 LOG(FATAL) << "Logging destination requires a log file to be specified."; 3219 LOG(FATAL) << "Logging destination requires a log file to be specified.";
3084 } 3220 }
3085 3221
3086 if (cl.HasSwitch("wait-for-iface")) { 3222 if (cl.HasSwitch("wait-for-iface")) {
3087 wait_for_iface = true; 3223 wait_for_iface = true;
3088 } 3224 }
3089 3225
3090 if (cl.HasSwitch("ssl-session-expiry")) { 3226 if (cl.HasSwitch("ssl-session-expiry")) {
3091 string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); 3227 string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry");
3092 g_proxy_config.ssl_session_expiry_ = atoi( session_expiry.c_str() ); 3228 g_proxy_config.ssl_session_expiry_ = atoi(session_expiry.c_str());
3093 } 3229 }
3094 3230
3095 if (cl.HasSwitch("ssl-disable-compression")) { 3231 if (cl.HasSwitch("ssl-disable-compression")) {
3096 g_proxy_config.ssl_disable_compression_ = true; 3232 g_proxy_config.ssl_disable_compression_ = true;
3097 } 3233 }
3098 3234
3235 if (cl.HasSwitch("idle-timeout")) {
3236 g_proxy_config.idle_timeout_s_ =
3237 atoi(cl.GetSwitchValueASCII("idle-timeout").c_str());
3238 }
3239
3099 if (cl.HasSwitch("force_spdy")) 3240 if (cl.HasSwitch("force_spdy"))
3100 FLAGS_force_spdy = true; 3241 FLAGS_force_spdy = true;
3101 3242
3102 InitLogging(g_proxy_config.log_filename_.c_str(), 3243 InitLogging(g_proxy_config.log_filename_.c_str(),
3103 g_proxy_config.log_destination_, 3244 g_proxy_config.log_destination_,
3104 logging::DONT_LOCK_LOG_FILE, 3245 logging::DONT_LOCK_LOG_FILE,
3105 logging::APPEND_TO_OLD_LOG_FILE, 3246 logging::APPEND_TO_OLD_LOG_FILE,
3106 logging::DISABLE_DCHECK_FOR_NON_OFFICIAL_RELEASE_BUILDS); 3247 logging::DISABLE_DCHECK_FOR_NON_OFFICIAL_RELEASE_BUILDS);
3107 3248
3108 LOG(INFO) << "Flip SPDY proxy started with configuration:"; 3249 LOG(INFO) << "Flip SPDY proxy started with configuration:";
3109 LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_; 3250 LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_;
3110 LOG(INFO) << "Log file : " << g_proxy_config.log_filename_; 3251 LOG(INFO) << "Log file : " << g_proxy_config.log_filename_;
3111 LOG(INFO) << "Forward IP Header : " 3252 LOG(INFO) << "Forward IP Header : "
3112 << (g_proxy_config.forward_ip_header_enabled_ ? 3253 << (g_proxy_config.forward_ip_header_enabled_ ?
3113 g_proxy_config.forward_ip_header_ : "(disabled)"); 3254 g_proxy_config.forward_ip_header_ : "(disabled)");
3114 LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false"); 3255 LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false");
3115 LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size; 3256 LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size;
3116 LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake; 3257 LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake;
3117 LOG(INFO) << "Disable nagle : " 3258 LOG(INFO) << "Disable nagle : "
3118 << (FLAGS_disable_nagle?"true":"false"); 3259 << (FLAGS_disable_nagle?"true":"false");
3119 LOG(INFO) << "Reuseport : " << (FLAGS_reuseport?"true":"false"); 3260 LOG(INFO) << "Reuseport : "
3261 << (FLAGS_reuseport?"true":"false");
3120 LOG(INFO) << "Force SPDY : " 3262 LOG(INFO) << "Force SPDY : "
3121 << (FLAGS_force_spdy?"true":"false"); 3263 << (FLAGS_force_spdy?"true":"false");
3122 LOG(INFO) << "SSL session expiry : " 3264 LOG(INFO) << "SSL session expiry : "
3123 << g_proxy_config.ssl_session_expiry_; 3265 << g_proxy_config.ssl_session_expiry_;
3124 LOG(INFO) << "SSL disable compression : " 3266 LOG(INFO) << "SSL disable compression : "
3125 << g_proxy_config.ssl_disable_compression_; 3267 << g_proxy_config.ssl_disable_compression_;
3268 LOG(INFO) << "Connection idle timeout : " << g_proxy_config.idle_timeout_s_;
3126 3269
3127 // Proxy Acceptors 3270 // Proxy Acceptors
3128 while (true) { 3271 while (true) {
3129 i += 1; 3272 i += 1;
3130 std::stringstream name; 3273 std::stringstream name;
3131 name << "proxy" << i; 3274 name << "proxy" << i;
3132 if (!cl.HasSwitch(name.str())) { 3275 if (!cl.HasSwitch(name.str())) {
3133 break; 3276 break;
3134 } 3277 }
3135 string value = cl.GetSwitchValueASCII(name.str()); 3278 string value = cl.GetSwitchValueASCII(name.str());
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
3202 // we either must make the MemoryCache threadsafe, or use 3345 // we either must make the MemoryCache threadsafe, or use
3203 // a separate MemoryCache for each thread. 3346 // a separate MemoryCache for each thread.
3204 // 3347 //
3205 // The latter is what is currently being done as we spawn 3348 // The latter is what is currently being done as we spawn
3206 // a separate thread for each http and spdy server acceptor. 3349 // a separate thread for each http and spdy server acceptor.
3207 3350
3208 sm_worker_threads_.back()->InitWorker(); 3351 sm_worker_threads_.back()->InitWorker();
3209 sm_worker_threads_.back()->Start(); 3352 sm_worker_threads_.back()->Start();
3210 } 3353 }
3211 3354
3212 while (true) { 3355 while (!wantExit) {
3356 // Close logfile when HUP signal is received. Logging system will
3357 // automatically reopen on next log message.
3358 if ( wantLogClose ) {
3359 wantLogClose = false;
3360 VLOG(1) << "HUP received, reopening log file.";
3361 logging::CloseLogFile();
3362 }
3213 if (GotQuitFromStdin()) { 3363 if (GotQuitFromStdin()) {
3214 for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) { 3364 for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
3215 sm_worker_threads_[i]->Quit(); 3365 sm_worker_threads_[i]->Quit();
3216 } 3366 }
3217 for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) { 3367 for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
3218 sm_worker_threads_[i]->Join(); 3368 sm_worker_threads_[i]->Join();
3219 } 3369 }
3220 return 0; 3370 break;
3221 } 3371 }
3222 usleep(1000*10); // 10 ms 3372 usleep(1000*10); // 10 ms
3223 } 3373 }
3224 3374
3375 unlink(PIDFILE);
3376 close(pidfile_fd);
3225 return 0; 3377 return 0;
3226 } 3378 }
OLDNEW
« no previous file with comments | « net/tools/flip_server/flip_config.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698