| Index: telemetry/third_party/tsproxy/tsproxy.py
|
| diff --git a/telemetry/third_party/tsproxy/tsproxy.py b/telemetry/third_party/tsproxy/tsproxy.py
|
| index fce486622597302c3f4c140eb25c306a210be085..fce58e91506f0e4afbded0adbb69ea81de2550f0 100644
|
| --- a/telemetry/third_party/tsproxy/tsproxy.py
|
| +++ b/telemetry/third_party/tsproxy/tsproxy.py
|
| @@ -17,7 +17,9 @@ limitations under the License.
|
| import asyncore
|
| import gc
|
| import logging
|
| +import platform
|
| import Queue
|
| +import re
|
| import signal
|
| import socket
|
| import sys
|
| @@ -36,7 +38,10 @@ port_mappings = None
|
| map_localhost = False
|
| needs_flush = False
|
| flush_pipes = False
|
| +last_activity = None
|
| REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
|
| +lock = threading.Lock()
|
| +background_activity_count = 0
|
|
|
|
|
| def PrintMessage(msg):
|
| @@ -45,7 +50,6 @@ def PrintMessage(msg):
|
| print >> sys.stdout, msg
|
| sys.stdout.flush()
|
|
|
| -
|
| ########################################################################################################################
|
| # Traffic-shaping pipe (just passthrough for now)
|
| ########################################################################################################################
|
| @@ -65,22 +69,52 @@ class TSPipe():
|
| if self.direction == self.PIPE_IN:
|
| self.peer = 'client'
|
|
|
| - def SendMessage(self, message):
|
| - global connections
|
| + def SendMessage(self, message, main_thread = True):
|
| + global connections, in_pipe, out_pipe
|
| + message_sent = False
|
| + now = time.clock()
|
| + if message['message'] == 'closed':
|
| + message['time'] = now
|
| + else:
|
| + message['time'] = time.clock() + self.latency
|
| + message['size'] = .0
|
| + if 'data' in message:
|
| + message['size'] = float(len(message['data']))
|
| try:
|
| connection_id = message['connection']
|
| - if connection_id in connections and self.peer in connections[connection_id]:
|
| - now = time.clock()
|
| - if message['message'] == 'closed':
|
| - message['time'] = now
|
| - else:
|
| - message['time'] = time.clock() + self.latency
|
| - message['size'] = .0
|
| - if 'data' in message:
|
| - message['size'] = float(len(message['data']))
|
| - self.queue.put(message)
|
| + # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread
|
| + if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0:
|
| + message_sent = self.SendPeerMessage(message)
|
| except:
|
| pass
|
| + if not message_sent:
|
| + try:
|
| + self.queue.put(message)
|
| + except:
|
| + pass
|
| +
|
| + def SendPeerMessage(self, message):
|
| + global last_activity
|
| + last_activity = time.clock()
|
| + message_sent = False
|
| + connection_id = message['connection']
|
| + if connection_id in connections:
|
| + if self.peer in connections[connection_id]:
|
| + try:
|
| + connections[connection_id][self.peer].handle_message(message)
|
| + message_sent = True
|
| + except:
|
| + # Clean up any disconnected connections
|
| + try:
|
| + connections[connection_id]['server'].close()
|
| + except:
|
| + pass
|
| + try:
|
| + connections[connection_id]['client'].close()
|
| + except:
|
| + pass
|
| + del connections[connection_id]
|
| + return message_sent
|
|
|
| def tick(self):
|
| global connections
|
| @@ -99,28 +133,13 @@ class TSPipe():
|
|
|
| # process messages as long as the next message is sendable (latency or available bytes)
|
| while (self.next_message is not None) and\
|
| - (flush_pipes or ((self.next_message['time'] <= now) and\
|
| + (flush_pipes or ((self.next_message['time'] <= now) and
|
| (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
|
| - processed_messages = True
|
| self.queue.task_done()
|
| - connection_id = self.next_message['connection']
|
| - if connection_id in connections:
|
| - if self.peer in connections[connection_id]:
|
| - try:
|
| - if self.kbps > .0:
|
| - self.available_bytes -= self.next_message['size']
|
| - connections[connection_id][self.peer].handle_message(self.next_message)
|
| - except:
|
| - # Clean up any disconnected connections
|
| - try:
|
| - connections[connection_id]['server'].close()
|
| - except:
|
| - pass
|
| - try:
|
| - connections[connection_id]['client'].close()
|
| - except:
|
| - pass
|
| - del connections[connection_id]
|
| + processed_messages = True
|
| + if self.kbps > .0:
|
| + self.available_bytes -= self.next_message['size']
|
| + self.SendPeerMessage(self.next_message)
|
| self.next_message = None
|
| self.next_message = self.queue.get_nowait()
|
| except:
|
| @@ -146,14 +165,24 @@ class AsyncDNS(threading.Thread):
|
| self.result_pipe = result_pipe
|
|
|
| def run(self):
|
| + global lock, background_activity_count
|
| try:
|
| + logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
|
| addresses = socket.getaddrinfo(self.hostname, self.port)
|
| logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
|
| except:
|
| addresses = ()
|
| logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
|
| message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
|
| - self.result_pipe.SendMessage(message)
|
| + self.result_pipe.SendMessage(message, False)
|
| + lock.acquire()
|
| + if background_activity_count > 0:
|
| + background_activity_count -= 1
|
| + lock.release()
|
| + # open and close a local socket which will interrupt the long polling loop to process the message
|
| + s = socket.socket()
|
| + s.connect((server.ipaddr, server.port))
|
| + s.close()
|
|
|
|
|
| ########################################################################################################################
|
| @@ -178,10 +207,8 @@ class TCPConnection(asyncore.dispatcher):
|
| self.port = None
|
| self.needs_config = True
|
| self.needs_close = False
|
| - self.read_available = False
|
| - self.window_available = options.window
|
| self.is_localhost = False
|
| - self.did_resolve = False;
|
| + self.did_resolve = False
|
|
|
| def SendMessage(self, type, message):
|
| message['message'] = type
|
| @@ -189,15 +216,10 @@ class TCPConnection(asyncore.dispatcher):
|
| in_pipe.SendMessage(message)
|
|
|
| def handle_message(self, message):
|
| - if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
|
| - if not self.needs_close:
|
| - self.buffer += message['data']
|
| - self.SendMessage('ack', {})
|
| - elif message['message'] == 'ack':
|
| - # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
|
| - self.window_available = min(self.window_available + 2, 350)
|
| - if self.read_available:
|
| - self.handle_read()
|
| + if message['message'] == 'data' and 'data' in message and len(message['data']):
|
| + self.buffer += message['data']
|
| + if self.state == self.STATE_CONNECTED:
|
| + self.handle_write()
|
| elif message['message'] == 'resolve':
|
| self.HandleResolve(message)
|
| elif message['message'] == 'connect':
|
| @@ -228,35 +250,36 @@ class TCPConnection(asyncore.dispatcher):
|
| except:
|
| pass
|
|
|
| - def writable(self):
|
| + def handle_connect(self):
|
| if self.state == self.STATE_CONNECTING:
|
| self.state = self.STATE_CONNECTED
|
| self.SendMessage('connected', {'success': True, 'address': self.addr})
|
| logging.info('[{0:d}] Connected'.format(self.client_id))
|
| - return (len(self.buffer) > 0 and self.state == self.STATE_CONNECTED)
|
| + self.handle_write()
|
| +
|
| + def writable(self):
|
| + if self.state == self.STATE_CONNECTING:
|
| + return True
|
| + return len(self.buffer) > 0
|
|
|
| def handle_write(self):
|
| if self.needs_config:
|
| self.needs_config = False
|
| self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
| - sent = self.send(self.buffer)
|
| - logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
|
| - self.buffer = self.buffer[sent:]
|
| - if self.needs_close and len(self.buffer) == 0:
|
| - self.needs_close = False
|
| - self.handle_close()
|
| + if len(self.buffer) > 0:
|
| + sent = self.send(self.buffer)
|
| + logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
|
| + self.buffer = self.buffer[sent:]
|
| + if self.needs_close and len(self.buffer) == 0:
|
| + self.needs_close = False
|
| + self.handle_close()
|
|
|
| def handle_read(self):
|
| - if self.window_available == 0:
|
| - self.read_available = True
|
| - return
|
| - self.read_available = False
|
| try:
|
| - while self.window_available > 0:
|
| + while True:
|
| data = self.recv(1460)
|
| if data:
|
| if self.state == self.STATE_CONNECTED:
|
| - self.window_available -= 1
|
| logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
|
| self.SendMessage('data', {'data': data})
|
| else:
|
| @@ -265,8 +288,7 @@ class TCPConnection(asyncore.dispatcher):
|
| pass
|
|
|
| def HandleResolve(self, message):
|
| - global in_pipe
|
| - global map_localhost
|
| + global in_pipe, map_localhost, lock, background_activity_count
|
| self.did_resolve = True
|
| if 'hostname' in message:
|
| self.hostname = message['hostname']
|
| @@ -280,8 +302,12 @@ class TCPConnection(asyncore.dispatcher):
|
| logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
|
| self.is_localhost = True
|
| if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
|
| + logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses))
|
| self.SendMessage('resolved', {'addresses': dest_addresses})
|
| else:
|
| + lock.acquire()
|
| + background_activity_count += 1
|
| + lock.release()
|
| self.state = self.STATE_RESOLVING
|
| self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
|
| self.dns_thread.start()
|
| @@ -361,8 +387,6 @@ class Socks5Connection(asyncore.dispatcher):
|
| self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
| self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
|
| self.needs_close = False
|
| - self.read_available = False
|
| - self.window_available = options.window
|
|
|
| def SendMessage(self, type, message):
|
| message['message'] = type
|
| @@ -370,52 +394,47 @@ class Socks5Connection(asyncore.dispatcher):
|
| out_pipe.SendMessage(message)
|
|
|
| def handle_message(self, message):
|
| - if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
|
| - if not self.needs_close:
|
| - self.buffer += message['data']
|
| - self.SendMessage('ack', {})
|
| - elif message['message'] == 'ack':
|
| - # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
|
| - self.window_available = min(self.window_available + 2, 350)
|
| - if self.read_available:
|
| - self.handle_read()
|
| + if message['message'] == 'data' and 'data' in message and len(message['data']) > 0:
|
| + self.buffer += message['data']
|
| + if self.state == self.STATE_CONNECTED:
|
| + self.handle_write()
|
| elif message['message'] == 'resolved':
|
| self.HandleResolved(message)
|
| elif message['message'] == 'connected':
|
| self.HandleConnected(message)
|
| + self.handle_write()
|
| elif message['message'] == 'closed':
|
| if len(self.buffer) == 0:
|
| + logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id))
|
| self.handle_close()
|
| else:
|
| + logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id))
|
| self.needs_close = True
|
|
|
| def writable(self):
|
| - return (len(self.buffer) > 0)
|
| + return len(self.buffer) > 0
|
|
|
| def handle_write(self):
|
| - sent = self.send(self.buffer)
|
| - logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
|
| - self.buffer = self.buffer[sent:]
|
| - if self.needs_close and len(self.buffer) == 0:
|
| - self.needs_close = False
|
| - self.handle_close()
|
| + if len(self.buffer) > 0:
|
| + sent = self.send(self.buffer)
|
| + logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
|
| + self.buffer = self.buffer[sent:]
|
| + if self.needs_close and len(self.buffer) == 0:
|
| + logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id))
|
| + self.needs_close = False
|
| + self.handle_close()
|
|
|
| def handle_read(self):
|
| global connections
|
| global dns_cache
|
| - if self.window_available == 0:
|
| - self.read_available = True
|
| - return
|
| - self.read_available = False
|
| try:
|
| - while self.window_available > 0:
|
| + while True:
|
| # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
|
| data = self.recv(1460)
|
| if data:
|
| data_len = len(data)
|
| if self.state == self.STATE_CONNECTED:
|
| logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
|
| - self.window_available -= 1
|
| self.SendMessage('data', {'data': data})
|
| elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
|
| self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
|
| @@ -433,6 +452,7 @@ class Socks5Connection(asyncore.dispatcher):
|
| response = chr(0x05) + chr(0x00)
|
| self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
|
| self.buffer += response
|
| + self.handle_write()
|
| elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
|
| self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
|
| if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
|
| @@ -468,6 +488,7 @@ class Socks5Connection(asyncore.dispatcher):
|
| self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
|
| elif self.ip is not None:
|
| self.state = self.STATE_CONNECTING
|
| + logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port))
|
| self.addresses = socket.getaddrinfo(self.ip, self.port)
|
| self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
|
| else:
|
| @@ -476,7 +497,7 @@ class Socks5Connection(asyncore.dispatcher):
|
| pass
|
|
|
| def handle_close(self):
|
| - logging.info('[{0:d}] Browser Connection Closed'.format(self.client_id))
|
| + logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id))
|
| self.state = self.STATE_ERROR
|
| self.close()
|
| try:
|
| @@ -503,6 +524,7 @@ class Socks5Connection(asyncore.dispatcher):
|
| # Send host unreachable error
|
| self.state = self.STATE_ERROR
|
| self.buffer += chr(0x05) + chr(0x04) + self.requested_address
|
| + self.handle_write()
|
|
|
| def HandleConnected(self, message):
|
| if 'success' in message and self.state == self.STATE_CONNECTING:
|
| @@ -517,6 +539,7 @@ class Socks5Connection(asyncore.dispatcher):
|
| response += chr(0x00)
|
| response += self.requested_address
|
| self.buffer += response
|
| + self.handle_write()
|
|
|
|
|
| ########################################################################################################################
|
| @@ -539,37 +562,57 @@ class CommandProcessor():
|
| global out_pipe
|
| global needs_flush
|
| global REMOVE_TCP_OVERHEAD
|
| + global port_mappings
|
| + global server
|
| if len(input):
|
| ok = False
|
| try:
|
| command = input.split()
|
| if len(command) and len(command[0]):
|
| if command[0].lower() == 'flush':
|
| - needs_flush = True
|
| - ok = True
|
| - elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'rtt' and len(command[2]):
|
| - rtt = float(command[2])
|
| - latency = rtt / 2000.0
|
| - in_pipe.latency = latency
|
| - out_pipe.latency = latency
|
| - needs_flush = True
|
| - ok = True
|
| - elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'inkbps' and len(command[2]):
|
| - in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
|
| - needs_flush = True
|
| - ok = True
|
| - elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'outkbps' and len(command[2]):
|
| - out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
|
| - needs_flush = True
|
| ok = True
|
| - elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'mapports' and len(command[2]):
|
| - SetPortMappings(command[2])
|
| + elif command[0].lower() == 'set' and len(command) >= 3:
|
| + if command[1].lower() == 'rtt' and len(command[2]):
|
| + rtt = float(command[2])
|
| + latency = rtt / 2000.0
|
| + in_pipe.latency = latency
|
| + out_pipe.latency = latency
|
| + ok = True
|
| + elif command[1].lower() == 'inkbps' and len(command[2]):
|
| + in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
|
| + ok = True
|
| + elif command[1].lower() == 'outkbps' and len(command[2]):
|
| + out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
|
| + ok = True
|
| + elif command[1].lower() == 'mapports' and len(command[2]):
|
| + SetPortMappings(command[2])
|
| + ok = True
|
| + elif command[0].lower() == 'reset' and len(command) >= 2:
|
| + if command[1].lower() == 'rtt' or command[1].lower() == 'all':
|
| + in_pipe.latency = 0
|
| + out_pipe.latency = 0
|
| + ok = True
|
| + if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
|
| + in_pipe.kbps = 0
|
| + ok = True
|
| + if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
|
| + out_pipe.kbps = 0
|
| + ok = True
|
| + if command[1].lower() == 'mapports' or command[1].lower() == 'all':
|
| + port_mappings = {}
|
| + ok = True
|
| +
|
| + if ok:
|
| needs_flush = True
|
| - ok = True
|
| except:
|
| pass
|
| if not ok:
|
| PrintMessage('ERROR')
|
| + # open and close a local socket which will interrupt the long polling loop to process the flush
|
| + if needs_flush:
|
| + s = socket.socket()
|
| + s.connect((server.ipaddr, server.port))
|
| + s.close()
|
|
|
|
|
| ########################################################################################################################
|
| @@ -620,6 +663,7 @@ def main():
|
|
|
| # Resolve the address for a rewrite destination host if one was specified
|
| if options.desthost:
|
| + logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80)))
|
| dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
|
|
|
| # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
|
| @@ -647,33 +691,53 @@ def run_loop():
|
| global out_pipe
|
| global needs_flush
|
| global flush_pipes
|
| - gc_check_count = 0
|
| + global last_activity
|
| + winmm = None
|
| +
|
| + # increase the windows timer resolution to 1ms
|
| + if platform.system() == "Windows":
|
| + try:
|
| + import ctypes
|
| + winmm = ctypes.WinDLL('winmm')
|
| + winmm.timeBeginPeriod(1)
|
| + except:
|
| + pass
|
| +
|
| last_activity = time.clock()
|
| + last_check = time.clock()
|
| # disable gc to avoid pauses during traffic shaping/proxying
|
| gc.disable()
|
| while not must_exit:
|
| - asyncore.poll(0.001, asyncore.socket_map)
|
| + # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
|
| + lock.acquire()
|
| + tick_interval = 0.001
|
| + if background_activity_count == 0:
|
| + if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
|
| + tick_interval = 1.0
|
| + elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
|
| + tick_interval = 1.0
|
| + lock.release()
|
| + asyncore.poll(tick_interval, asyncore.socket_map)
|
| if needs_flush:
|
| flush_pipes = True
|
| needs_flush = False
|
| - if in_pipe.tick():
|
| - last_activity = time.clock()
|
| - if out_pipe.tick():
|
| - last_activity = time.clock()
|
| + out_pipe.tick()
|
| + in_pipe.tick()
|
| if flush_pipes:
|
| PrintMessage('OK')
|
| flush_pipes = False
|
| - # Every 500 loops (~0.5 second) check to see if it is a good time to do a gc
|
| - if gc_check_count > 1000:
|
| - gc_check_count = 0
|
| + # Every 500 ms check to see if it is a good time to do a gc
|
| + now = time.clock()
|
| + if now - last_check > 0.5:
|
| + last_check = now
|
| # manually gc after 5 seconds of idle
|
| - if time.clock() - last_activity >= 5:
|
| - last_activity = time.clock()
|
| + if now - last_activity >= 5:
|
| + last_activity = now
|
| logging.debug("Triggering manual GC")
|
| gc.collect()
|
| - else:
|
| - gc_check_count += 1
|
|
|
| + if winmm is not None:
|
| + winmm.timeEndPeriod(1)
|
|
|
| def GetDestPort(port):
|
| global port_mappings
|
|
|