Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(630)

Unified Diff: telemetry/third_party/tsproxy/tsproxy.py

Issue 2516973005: Update ts_proxy to latest commit (Closed)
Patch Set: Sync further Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « telemetry/third_party/tsproxy/README.md ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: telemetry/third_party/tsproxy/tsproxy.py
diff --git a/telemetry/third_party/tsproxy/tsproxy.py b/telemetry/third_party/tsproxy/tsproxy.py
index fce486622597302c3f4c140eb25c306a210be085..fce58e91506f0e4afbded0adbb69ea81de2550f0 100644
--- a/telemetry/third_party/tsproxy/tsproxy.py
+++ b/telemetry/third_party/tsproxy/tsproxy.py
@@ -17,7 +17,9 @@ limitations under the License.
import asyncore
import gc
import logging
+import platform
import Queue
+import re
import signal
import socket
import sys
@@ -36,7 +38,10 @@ port_mappings = None
map_localhost = False
needs_flush = False
flush_pipes = False
+last_activity = None
REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
+lock = threading.Lock()
+background_activity_count = 0
def PrintMessage(msg):
@@ -45,7 +50,6 @@ def PrintMessage(msg):
print >> sys.stdout, msg
sys.stdout.flush()
-
########################################################################################################################
# Traffic-shaping pipe (just passthrough for now)
########################################################################################################################
@@ -65,22 +69,52 @@ class TSPipe():
if self.direction == self.PIPE_IN:
self.peer = 'client'
- def SendMessage(self, message):
- global connections
+ def SendMessage(self, message, main_thread = True):
+ global connections, in_pipe, out_pipe
+ message_sent = False
+ now = time.clock()
+ if message['message'] == 'closed':
+ message['time'] = now
+ else:
+ message['time'] = time.clock() + self.latency
+ message['size'] = .0
+ if 'data' in message:
+ message['size'] = float(len(message['data']))
try:
connection_id = message['connection']
- if connection_id in connections and self.peer in connections[connection_id]:
- now = time.clock()
- if message['message'] == 'closed':
- message['time'] = now
- else:
- message['time'] = time.clock() + self.latency
- message['size'] = .0
- if 'data' in message:
- message['size'] = float(len(message['data']))
- self.queue.put(message)
+ # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread
+ if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0:
+ message_sent = self.SendPeerMessage(message)
except:
pass
+ if not message_sent:
+ try:
+ self.queue.put(message)
+ except:
+ pass
+
+ def SendPeerMessage(self, message):
+ global last_activity
+ last_activity = time.clock()
+ message_sent = False
+ connection_id = message['connection']
+ if connection_id in connections:
+ if self.peer in connections[connection_id]:
+ try:
+ connections[connection_id][self.peer].handle_message(message)
+ message_sent = True
+ except:
+ # Clean up any disconnected connections
+ try:
+ connections[connection_id]['server'].close()
+ except:
+ pass
+ try:
+ connections[connection_id]['client'].close()
+ except:
+ pass
+ del connections[connection_id]
+ return message_sent
def tick(self):
global connections
@@ -99,28 +133,13 @@ class TSPipe():
# process messages as long as the next message is sendable (latency or available bytes)
while (self.next_message is not None) and\
- (flush_pipes or ((self.next_message['time'] <= now) and\
+ (flush_pipes or ((self.next_message['time'] <= now) and
(self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
- processed_messages = True
self.queue.task_done()
- connection_id = self.next_message['connection']
- if connection_id in connections:
- if self.peer in connections[connection_id]:
- try:
- if self.kbps > .0:
- self.available_bytes -= self.next_message['size']
- connections[connection_id][self.peer].handle_message(self.next_message)
- except:
- # Clean up any disconnected connections
- try:
- connections[connection_id]['server'].close()
- except:
- pass
- try:
- connections[connection_id]['client'].close()
- except:
- pass
- del connections[connection_id]
+ processed_messages = True
+ if self.kbps > .0:
+ self.available_bytes -= self.next_message['size']
+ self.SendPeerMessage(self.next_message)
self.next_message = None
self.next_message = self.queue.get_nowait()
except:
@@ -146,14 +165,24 @@ class AsyncDNS(threading.Thread):
self.result_pipe = result_pipe
def run(self):
+ global lock, background_activity_count
try:
+ logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
addresses = socket.getaddrinfo(self.hostname, self.port)
logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
except:
addresses = ()
logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
- self.result_pipe.SendMessage(message)
+ self.result_pipe.SendMessage(message, False)
+ lock.acquire()
+ if background_activity_count > 0:
+ background_activity_count -= 1
+ lock.release()
+ # open and close a local socket which will interrupt the long polling loop to process the message
+ s = socket.socket()
+ s.connect((server.ipaddr, server.port))
+ s.close()
########################################################################################################################
@@ -178,10 +207,8 @@ class TCPConnection(asyncore.dispatcher):
self.port = None
self.needs_config = True
self.needs_close = False
- self.read_available = False
- self.window_available = options.window
self.is_localhost = False
- self.did_resolve = False;
+ self.did_resolve = False
def SendMessage(self, type, message):
message['message'] = type
@@ -189,15 +216,10 @@ class TCPConnection(asyncore.dispatcher):
in_pipe.SendMessage(message)
def handle_message(self, message):
- if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
- if not self.needs_close:
- self.buffer += message['data']
- self.SendMessage('ack', {})
- elif message['message'] == 'ack':
- # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
- self.window_available = min(self.window_available + 2, 350)
- if self.read_available:
- self.handle_read()
+ if message['message'] == 'data' and 'data' in message and len(message['data']):
+ self.buffer += message['data']
+ if self.state == self.STATE_CONNECTED:
+ self.handle_write()
elif message['message'] == 'resolve':
self.HandleResolve(message)
elif message['message'] == 'connect':
@@ -228,35 +250,36 @@ class TCPConnection(asyncore.dispatcher):
except:
pass
- def writable(self):
+ def handle_connect(self):
if self.state == self.STATE_CONNECTING:
self.state = self.STATE_CONNECTED
self.SendMessage('connected', {'success': True, 'address': self.addr})
logging.info('[{0:d}] Connected'.format(self.client_id))
- return (len(self.buffer) > 0 and self.state == self.STATE_CONNECTED)
+ self.handle_write()
+
+ def writable(self):
+ if self.state == self.STATE_CONNECTING:
+ return True
+ return len(self.buffer) > 0
def handle_write(self):
if self.needs_config:
self.needs_config = False
self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- sent = self.send(self.buffer)
- logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
- self.buffer = self.buffer[sent:]
- if self.needs_close and len(self.buffer) == 0:
- self.needs_close = False
- self.handle_close()
+ if len(self.buffer) > 0:
+ sent = self.send(self.buffer)
+ logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
+ self.buffer = self.buffer[sent:]
+ if self.needs_close and len(self.buffer) == 0:
+ self.needs_close = False
+ self.handle_close()
def handle_read(self):
- if self.window_available == 0:
- self.read_available = True
- return
- self.read_available = False
try:
- while self.window_available > 0:
+ while True:
data = self.recv(1460)
if data:
if self.state == self.STATE_CONNECTED:
- self.window_available -= 1
logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
self.SendMessage('data', {'data': data})
else:
@@ -265,8 +288,7 @@ class TCPConnection(asyncore.dispatcher):
pass
def HandleResolve(self, message):
- global in_pipe
- global map_localhost
+ global in_pipe, map_localhost, lock, background_activity_count
self.did_resolve = True
if 'hostname' in message:
self.hostname = message['hostname']
@@ -280,8 +302,12 @@ class TCPConnection(asyncore.dispatcher):
logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
self.is_localhost = True
if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
+ logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses))
self.SendMessage('resolved', {'addresses': dest_addresses})
else:
+ lock.acquire()
+ background_activity_count += 1
+ lock.release()
self.state = self.STATE_RESOLVING
self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
self.dns_thread.start()
@@ -361,8 +387,6 @@ class Socks5Connection(asyncore.dispatcher):
self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
self.needs_close = False
- self.read_available = False
- self.window_available = options.window
def SendMessage(self, type, message):
message['message'] = type
@@ -370,52 +394,47 @@ class Socks5Connection(asyncore.dispatcher):
out_pipe.SendMessage(message)
def handle_message(self, message):
- if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
- if not self.needs_close:
- self.buffer += message['data']
- self.SendMessage('ack', {})
- elif message['message'] == 'ack':
- # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
- self.window_available = min(self.window_available + 2, 350)
- if self.read_available:
- self.handle_read()
+ if message['message'] == 'data' and 'data' in message and len(message['data']) > 0:
+ self.buffer += message['data']
+ if self.state == self.STATE_CONNECTED:
+ self.handle_write()
elif message['message'] == 'resolved':
self.HandleResolved(message)
elif message['message'] == 'connected':
self.HandleConnected(message)
+ self.handle_write()
elif message['message'] == 'closed':
if len(self.buffer) == 0:
+ logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id))
self.handle_close()
else:
+ logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id))
self.needs_close = True
def writable(self):
- return (len(self.buffer) > 0)
+ return len(self.buffer) > 0
def handle_write(self):
- sent = self.send(self.buffer)
- logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
- self.buffer = self.buffer[sent:]
- if self.needs_close and len(self.buffer) == 0:
- self.needs_close = False
- self.handle_close()
+ if len(self.buffer) > 0:
+ sent = self.send(self.buffer)
+ logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
+ self.buffer = self.buffer[sent:]
+ if self.needs_close and len(self.buffer) == 0:
+ logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id))
+ self.needs_close = False
+ self.handle_close()
def handle_read(self):
global connections
global dns_cache
- if self.window_available == 0:
- self.read_available = True
- return
- self.read_available = False
try:
- while self.window_available > 0:
+ while True:
# Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
data = self.recv(1460)
if data:
data_len = len(data)
if self.state == self.STATE_CONNECTED:
logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
- self.window_available -= 1
self.SendMessage('data', {'data': data})
elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
@@ -433,6 +452,7 @@ class Socks5Connection(asyncore.dispatcher):
response = chr(0x05) + chr(0x00)
self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
self.buffer += response
+ self.handle_write()
elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
@@ -468,6 +488,7 @@ class Socks5Connection(asyncore.dispatcher):
self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
elif self.ip is not None:
self.state = self.STATE_CONNECTING
+ logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port))
self.addresses = socket.getaddrinfo(self.ip, self.port)
self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
else:
@@ -476,7 +497,7 @@ class Socks5Connection(asyncore.dispatcher):
pass
def handle_close(self):
- logging.info('[{0:d}] Browser Connection Closed'.format(self.client_id))
+ logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id))
self.state = self.STATE_ERROR
self.close()
try:
@@ -503,6 +524,7 @@ class Socks5Connection(asyncore.dispatcher):
# Send host unreachable error
self.state = self.STATE_ERROR
self.buffer += chr(0x05) + chr(0x04) + self.requested_address
+ self.handle_write()
def HandleConnected(self, message):
if 'success' in message and self.state == self.STATE_CONNECTING:
@@ -517,6 +539,7 @@ class Socks5Connection(asyncore.dispatcher):
response += chr(0x00)
response += self.requested_address
self.buffer += response
+ self.handle_write()
########################################################################################################################
@@ -539,37 +562,57 @@ class CommandProcessor():
global out_pipe
global needs_flush
global REMOVE_TCP_OVERHEAD
+ global port_mappings
+ global server
if len(input):
ok = False
try:
command = input.split()
if len(command) and len(command[0]):
if command[0].lower() == 'flush':
- needs_flush = True
- ok = True
- elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'rtt' and len(command[2]):
- rtt = float(command[2])
- latency = rtt / 2000.0
- in_pipe.latency = latency
- out_pipe.latency = latency
- needs_flush = True
- ok = True
- elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'inkbps' and len(command[2]):
- in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
- needs_flush = True
- ok = True
- elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'outkbps' and len(command[2]):
- out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
- needs_flush = True
ok = True
- elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'mapports' and len(command[2]):
- SetPortMappings(command[2])
+ elif command[0].lower() == 'set' and len(command) >= 3:
+ if command[1].lower() == 'rtt' and len(command[2]):
+ rtt = float(command[2])
+ latency = rtt / 2000.0
+ in_pipe.latency = latency
+ out_pipe.latency = latency
+ ok = True
+ elif command[1].lower() == 'inkbps' and len(command[2]):
+ in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
+ ok = True
+ elif command[1].lower() == 'outkbps' and len(command[2]):
+ out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
+ ok = True
+ elif command[1].lower() == 'mapports' and len(command[2]):
+ SetPortMappings(command[2])
+ ok = True
+ elif command[0].lower() == 'reset' and len(command) >= 2:
+ if command[1].lower() == 'rtt' or command[1].lower() == 'all':
+ in_pipe.latency = 0
+ out_pipe.latency = 0
+ ok = True
+ if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
+ in_pipe.kbps = 0
+ ok = True
+ if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
+ out_pipe.kbps = 0
+ ok = True
+ if command[1].lower() == 'mapports' or command[1].lower() == 'all':
+ port_mappings = {}
+ ok = True
+
+ if ok:
needs_flush = True
- ok = True
except:
pass
if not ok:
PrintMessage('ERROR')
+ # open and close a local socket which will interrupt the long polling loop to process the flush
+ if needs_flush:
+ s = socket.socket()
+ s.connect((server.ipaddr, server.port))
+ s.close()
########################################################################################################################
@@ -620,6 +663,7 @@ def main():
# Resolve the address for a rewrite destination host if one was specified
if options.desthost:
+ logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80)))
dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
# Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
@@ -647,33 +691,53 @@ def run_loop():
global out_pipe
global needs_flush
global flush_pipes
- gc_check_count = 0
+ global last_activity
+ winmm = None
+
+ # increase the windows timer resolution to 1ms
+ if platform.system() == "Windows":
+ try:
+ import ctypes
+ winmm = ctypes.WinDLL('winmm')
+ winmm.timeBeginPeriod(1)
+ except:
+ pass
+
last_activity = time.clock()
+ last_check = time.clock()
# disable gc to avoid pauses during traffic shaping/proxying
gc.disable()
while not must_exit:
- asyncore.poll(0.001, asyncore.socket_map)
+ # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
+ lock.acquire()
+ tick_interval = 0.001
+ if background_activity_count == 0:
+ if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
+ tick_interval = 1.0
+ elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
+ tick_interval = 1.0
+ lock.release()
+ asyncore.poll(tick_interval, asyncore.socket_map)
if needs_flush:
flush_pipes = True
needs_flush = False
- if in_pipe.tick():
- last_activity = time.clock()
- if out_pipe.tick():
- last_activity = time.clock()
+ out_pipe.tick()
+ in_pipe.tick()
if flush_pipes:
PrintMessage('OK')
flush_pipes = False
- # Every 500 loops (~0.5 second) check to see if it is a good time to do a gc
- if gc_check_count > 1000:
- gc_check_count = 0
+ # Every 500 ms check to see if it is a good time to do a gc
+ now = time.clock()
+ if now - last_check > 0.5:
+ last_check = now
# manually gc after 5 seconds of idle
- if time.clock() - last_activity >= 5:
- last_activity = time.clock()
+ if now - last_activity >= 5:
+ last_activity = now
logging.debug("Triggering manual GC")
gc.collect()
- else:
- gc_check_count += 1
+ if winmm is not None:
+ winmm.timeEndPeriod(1)
def GetDestPort(port):
global port_mappings
« no previous file with comments | « telemetry/third_party/tsproxy/README.md ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698