Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(678)

Side by Side Diff: telemetry/third_party/tsproxy/tsproxy.py

Issue 2516973005: Update ts_proxy to latest commit (Closed)
Patch Set: Sync further Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « telemetry/third_party/tsproxy/README.md ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 """ 2 """
3 Copyright 2016 Google Inc. All Rights Reserved. 3 Copyright 2016 Google Inc. All Rights Reserved.
4 4
5 Licensed under the Apache License, Version 2.0 (the "License"); 5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License. 6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at 7 You may obtain a copy of the License at
8 8
9 http://www.apache.org/licenses/LICENSE-2.0 9 http://www.apache.org/licenses/LICENSE-2.0
10 10
11 Unless required by applicable law or agreed to in writing, software 11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS, 12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and 14 See the License for the specific language governing permissions and
15 limitations under the License. 15 limitations under the License.
16 """ 16 """
17 import asyncore 17 import asyncore
18 import gc 18 import gc
19 import logging 19 import logging
20 import platform
20 import Queue 21 import Queue
22 import re
21 import signal 23 import signal
22 import socket 24 import socket
23 import sys 25 import sys
24 import threading 26 import threading
25 import time 27 import time
26 28
27 server = None 29 server = None
28 in_pipe = None 30 in_pipe = None
29 out_pipe = None 31 out_pipe = None
30 must_exit = False 32 must_exit = False
31 options = None 33 options = None
32 dest_addresses = None 34 dest_addresses = None
33 connections = {} 35 connections = {}
34 dns_cache = {} 36 dns_cache = {}
35 port_mappings = None 37 port_mappings = None
36 map_localhost = False 38 map_localhost = False
37 needs_flush = False 39 needs_flush = False
38 flush_pipes = False 40 flush_pipes = False
41 last_activity = None
39 REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0 42 REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
43 lock = threading.Lock()
44 background_activity_count = 0
40 45
41 46
42 def PrintMessage(msg): 47 def PrintMessage(msg):
43 # Print the message to stdout & flush to make sure that the message is not 48 # Print the message to stdout & flush to make sure that the message is not
44 # buffered when tsproxy is run as a subprocess. 49 # buffered when tsproxy is run as a subprocess.
45 print >> sys.stdout, msg 50 print >> sys.stdout, msg
46 sys.stdout.flush() 51 sys.stdout.flush()
47 52
48
49 ################################################################################ ######################################## 53 ################################################################################ ########################################
50 # Traffic-shaping pipe (just passthrough for now) 54 # Traffic-shaping pipe (just passthrough for now)
51 ################################################################################ ######################################## 55 ################################################################################ ########################################
52 class TSPipe(): 56 class TSPipe():
53 PIPE_IN = 0 57 PIPE_IN = 0
54 PIPE_OUT = 1 58 PIPE_OUT = 1
55 59
56 def __init__(self, direction, latency, kbps): 60 def __init__(self, direction, latency, kbps):
57 self.direction = direction 61 self.direction = direction
58 self.latency = latency 62 self.latency = latency
59 self.kbps = kbps 63 self.kbps = kbps
60 self.queue = Queue.Queue() 64 self.queue = Queue.Queue()
61 self.last_tick = time.clock() 65 self.last_tick = time.clock()
62 self.next_message = None 66 self.next_message = None
63 self.available_bytes = .0 67 self.available_bytes = .0
64 self.peer = 'server' 68 self.peer = 'server'
65 if self.direction == self.PIPE_IN: 69 if self.direction == self.PIPE_IN:
66 self.peer = 'client' 70 self.peer = 'client'
67 71
68 def SendMessage(self, message): 72 def SendMessage(self, message, main_thread = True):
69 global connections 73 global connections, in_pipe, out_pipe
74 message_sent = False
75 now = time.clock()
76 if message['message'] == 'closed':
77 message['time'] = now
78 else:
79 message['time'] = time.clock() + self.latency
80 message['size'] = .0
81 if 'data' in message:
82 message['size'] = float(len(message['data']))
70 try: 83 try:
71 connection_id = message['connection'] 84 connection_id = message['connection']
72 if connection_id in connections and self.peer in connections[connection_id ]: 85 # Send messages directly, bypassing the queues is throttling is disabled a nd we are on the main thread
73 now = time.clock() 86 if main_thread and connection_id in connections and self.peer in connectio ns[connection_id]and self.latency == 0 and self.kbps == .0:
74 if message['message'] == 'closed': 87 message_sent = self.SendPeerMessage(message)
75 message['time'] = now
76 else:
77 message['time'] = time.clock() + self.latency
78 message['size'] = .0
79 if 'data' in message:
80 message['size'] = float(len(message['data']))
81 self.queue.put(message)
82 except: 88 except:
83 pass 89 pass
90 if not message_sent:
91 try:
92 self.queue.put(message)
93 except:
94 pass
95
96 def SendPeerMessage(self, message):
97 global last_activity
98 last_activity = time.clock()
99 message_sent = False
100 connection_id = message['connection']
101 if connection_id in connections:
102 if self.peer in connections[connection_id]:
103 try:
104 connections[connection_id][self.peer].handle_message(message)
105 message_sent = True
106 except:
107 # Clean up any disconnected connections
108 try:
109 connections[connection_id]['server'].close()
110 except:
111 pass
112 try:
113 connections[connection_id]['client'].close()
114 except:
115 pass
116 del connections[connection_id]
117 return message_sent
84 118
85 def tick(self): 119 def tick(self):
86 global connections 120 global connections
87 global flush_pipes 121 global flush_pipes
88 processed_messages = False 122 processed_messages = False
89 now = time.clock() 123 now = time.clock()
90 try: 124 try:
91 if self.next_message is None: 125 if self.next_message is None:
92 self.next_message = self.queue.get_nowait() 126 self.next_message = self.queue.get_nowait()
93 127
94 # Accumulate bandwidth if an available packet/message was waiting since ou r last tick 128 # Accumulate bandwidth if an available packet/message was waiting since ou r last tick
95 if self.next_message is not None and self.kbps > .0 and self.next_message[ 'time'] <= now: 129 if self.next_message is not None and self.kbps > .0 and self.next_message[ 'time'] <= now:
96 elapsed = now - self.last_tick 130 elapsed = now - self.last_tick
97 accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0 131 accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0
98 self.available_bytes += accumulated_bytes 132 self.available_bytes += accumulated_bytes
99 133
100 # process messages as long as the next message is sendable (latency or ava ilable bytes) 134 # process messages as long as the next message is sendable (latency or ava ilable bytes)
101 while (self.next_message is not None) and\ 135 while (self.next_message is not None) and\
102 (flush_pipes or ((self.next_message['time'] <= now) and\ 136 (flush_pipes or ((self.next_message['time'] <= now) and
103 (self.kbps <= .0 or self.next_message['size'] <= self. available_bytes))): 137 (self.kbps <= .0 or self.next_message['size'] <= self. available_bytes))):
138 self.queue.task_done()
104 processed_messages = True 139 processed_messages = True
105 self.queue.task_done() 140 if self.kbps > .0:
106 connection_id = self.next_message['connection'] 141 self.available_bytes -= self.next_message['size']
107 if connection_id in connections: 142 self.SendPeerMessage(self.next_message)
108 if self.peer in connections[connection_id]:
109 try:
110 if self.kbps > .0:
111 self.available_bytes -= self.next_message['size']
112 connections[connection_id][self.peer].handle_message(self.next_mes sage)
113 except:
114 # Clean up any disconnected connections
115 try:
116 connections[connection_id]['server'].close()
117 except:
118 pass
119 try:
120 connections[connection_id]['client'].close()
121 except:
122 pass
123 del connections[connection_id]
124 self.next_message = None 143 self.next_message = None
125 self.next_message = self.queue.get_nowait() 144 self.next_message = self.queue.get_nowait()
126 except: 145 except:
127 pass 146 pass
128 147
129 # Only accumulate bytes while we have messages that are ready to send 148 # Only accumulate bytes while we have messages that are ready to send
130 if self.next_message is None or self.next_message['time'] > now: 149 if self.next_message is None or self.next_message['time'] > now:
131 self.available_bytes = .0 150 self.available_bytes = .0
132 self.last_tick = now 151 self.last_tick = now
133 152
134 return processed_messages 153 return processed_messages
135 154
136 155
137 ################################################################################ ######################################## 156 ################################################################################ ########################################
138 # Threaded DNS resolver 157 # Threaded DNS resolver
139 ################################################################################ ######################################## 158 ################################################################################ ########################################
140 class AsyncDNS(threading.Thread): 159 class AsyncDNS(threading.Thread):
141 def __init__(self, client_id, hostname, port, result_pipe): 160 def __init__(self, client_id, hostname, port, result_pipe):
142 threading.Thread.__init__(self) 161 threading.Thread.__init__(self)
143 self.hostname = hostname 162 self.hostname = hostname
144 self.port = port 163 self.port = port
145 self.client_id = client_id 164 self.client_id = client_id
146 self.result_pipe = result_pipe 165 self.result_pipe = result_pipe
147 166
148 def run(self): 167 def run(self):
168 global lock, background_activity_count
149 try: 169 try:
170 logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.forma t(self.client_id, self.hostname, self.port))
150 addresses = socket.getaddrinfo(self.hostname, self.port) 171 addresses = socket.getaddrinfo(self.hostname, self.port)
151 logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id , self.hostname, self.port)) 172 logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id , self.hostname, self.port))
152 except: 173 except:
153 addresses = () 174 addresses = ()
154 logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, s elf.hostname, self.port)) 175 logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, s elf.hostname, self.port))
155 message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses} 176 message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
156 self.result_pipe.SendMessage(message) 177 self.result_pipe.SendMessage(message, False)
178 lock.acquire()
179 if background_activity_count > 0:
180 background_activity_count -= 1
181 lock.release()
182 # open and close a local socket which will interrupt the long polling loop t o process the message
183 s = socket.socket()
184 s.connect((server.ipaddr, server.port))
185 s.close()
157 186
158 187
159 ################################################################################ ######################################## 188 ################################################################################ ########################################
160 # TCP Client 189 # TCP Client
161 ################################################################################ ######################################## 190 ################################################################################ ########################################
162 class TCPConnection(asyncore.dispatcher): 191 class TCPConnection(asyncore.dispatcher):
163 STATE_ERROR = -1 192 STATE_ERROR = -1
164 STATE_IDLE = 0 193 STATE_IDLE = 0
165 STATE_RESOLVING = 1 194 STATE_RESOLVING = 1
166 STATE_CONNECTING = 2 195 STATE_CONNECTING = 2
167 STATE_CONNECTED = 3 196 STATE_CONNECTED = 3
168 197
169 def __init__(self, client_id): 198 def __init__(self, client_id):
170 global options 199 global options
171 asyncore.dispatcher.__init__(self) 200 asyncore.dispatcher.__init__(self)
172 self.client_id = client_id 201 self.client_id = client_id
173 self.state = self.STATE_IDLE 202 self.state = self.STATE_IDLE
174 self.buffer = '' 203 self.buffer = ''
175 self.addr = None 204 self.addr = None
176 self.dns_thread = None 205 self.dns_thread = None
177 self.hostname = None 206 self.hostname = None
178 self.port = None 207 self.port = None
179 self.needs_config = True 208 self.needs_config = True
180 self.needs_close = False 209 self.needs_close = False
181 self.read_available = False
182 self.window_available = options.window
183 self.is_localhost = False 210 self.is_localhost = False
184 self.did_resolve = False; 211 self.did_resolve = False
185 212
186 def SendMessage(self, type, message): 213 def SendMessage(self, type, message):
187 message['message'] = type 214 message['message'] = type
188 message['connection'] = self.client_id 215 message['connection'] = self.client_id
189 in_pipe.SendMessage(message) 216 in_pipe.SendMessage(message)
190 217
191 def handle_message(self, message): 218 def handle_message(self, message):
192 if message['message'] == 'data' and 'data' in message and len(message['data' ]) and self.state == self.STATE_CONNECTED: 219 if message['message'] == 'data' and 'data' in message and len(message['data' ]):
193 if not self.needs_close: 220 self.buffer += message['data']
194 self.buffer += message['data'] 221 if self.state == self.STATE_CONNECTED:
195 self.SendMessage('ack', {}) 222 self.handle_write()
196 elif message['message'] == 'ack':
197 # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
198 self.window_available = min(self.window_available + 2, 350)
199 if self.read_available:
200 self.handle_read()
201 elif message['message'] == 'resolve': 223 elif message['message'] == 'resolve':
202 self.HandleResolve(message) 224 self.HandleResolve(message)
203 elif message['message'] == 'connect': 225 elif message['message'] == 'connect':
204 self.HandleConnect(message) 226 self.HandleConnect(message)
205 elif message['message'] == 'closed': 227 elif message['message'] == 'closed':
206 if len(self.buffer) == 0: 228 if len(self.buffer) == 0:
207 self.handle_close() 229 self.handle_close()
208 else: 230 else:
209 self.needs_close = True 231 self.needs_close = True
210 232
(...skipping 10 matching lines...) Expand all
221 if self.client_id in connections: 243 if self.client_id in connections:
222 if 'server' in connections[self.client_id]: 244 if 'server' in connections[self.client_id]:
223 del connections[self.client_id]['server'] 245 del connections[self.client_id]['server']
224 if 'client' in connections[self.client_id]: 246 if 'client' in connections[self.client_id]:
225 self.SendMessage('closed', {}) 247 self.SendMessage('closed', {})
226 else: 248 else:
227 del connections[self.client_id] 249 del connections[self.client_id]
228 except: 250 except:
229 pass 251 pass
230 252
231 def writable(self): 253 def handle_connect(self):
232 if self.state == self.STATE_CONNECTING: 254 if self.state == self.STATE_CONNECTING:
233 self.state = self.STATE_CONNECTED 255 self.state = self.STATE_CONNECTED
234 self.SendMessage('connected', {'success': True, 'address': self.addr}) 256 self.SendMessage('connected', {'success': True, 'address': self.addr})
235 logging.info('[{0:d}] Connected'.format(self.client_id)) 257 logging.info('[{0:d}] Connected'.format(self.client_id))
236 return (len(self.buffer) > 0 and self.state == self.STATE_CONNECTED) 258 self.handle_write()
259
260 def writable(self):
261 if self.state == self.STATE_CONNECTING:
262 return True
263 return len(self.buffer) > 0
237 264
238 def handle_write(self): 265 def handle_write(self):
239 if self.needs_config: 266 if self.needs_config:
240 self.needs_config = False 267 self.needs_config = False
241 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 268 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
242 sent = self.send(self.buffer) 269 if len(self.buffer) > 0:
243 logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent)) 270 sent = self.send(self.buffer)
244 self.buffer = self.buffer[sent:] 271 logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
245 if self.needs_close and len(self.buffer) == 0: 272 self.buffer = self.buffer[sent:]
246 self.needs_close = False 273 if self.needs_close and len(self.buffer) == 0:
247 self.handle_close() 274 self.needs_close = False
275 self.handle_close()
248 276
249 def handle_read(self): 277 def handle_read(self):
250 if self.window_available == 0:
251 self.read_available = True
252 return
253 self.read_available = False
254 try: 278 try:
255 while self.window_available > 0: 279 while True:
256 data = self.recv(1460) 280 data = self.recv(1460)
257 if data: 281 if data:
258 if self.state == self.STATE_CONNECTED: 282 if self.state == self.STATE_CONNECTED:
259 self.window_available -= 1
260 logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data))) 283 logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
261 self.SendMessage('data', {'data': data}) 284 self.SendMessage('data', {'data': data})
262 else: 285 else:
263 return 286 return
264 except: 287 except:
265 pass 288 pass
266 289
267 def HandleResolve(self, message): 290 def HandleResolve(self, message):
268 global in_pipe 291 global in_pipe, map_localhost, lock, background_activity_count
269 global map_localhost
270 self.did_resolve = True 292 self.did_resolve = True
271 if 'hostname' in message: 293 if 'hostname' in message:
272 self.hostname = message['hostname'] 294 self.hostname = message['hostname']
273 self.port = 0 295 self.port = 0
274 if 'port' in message: 296 if 'port' in message:
275 self.port = message['port'] 297 self.port = message['port']
276 logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostn ame, self.port)) 298 logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostn ame, self.port))
277 if self.hostname == 'localhost': 299 if self.hostname == 'localhost':
278 self.hostname = '127.0.0.1' 300 self.hostname = '127.0.0.1'
279 if self.hostname == '127.0.0.1': 301 if self.hostname == '127.0.0.1':
280 logging.info('[{0:d}] Connection to localhost detected'.format(self.client _id)) 302 logging.info('[{0:d}] Connection to localhost detected'.format(self.client _id))
281 self.is_localhost = True 303 self.is_localhost = True
282 if (dest_addresses is not None) and (not self.is_localhost or map_localhost) : 304 if (dest_addresses is not None) and (not self.is_localhost or map_localhost) :
305 logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(se lf.client_id, self.hostname, self.port, dest_addresses))
283 self.SendMessage('resolved', {'addresses': dest_addresses}) 306 self.SendMessage('resolved', {'addresses': dest_addresses})
284 else: 307 else:
308 lock.acquire()
309 background_activity_count += 1
310 lock.release()
285 self.state = self.STATE_RESOLVING 311 self.state = self.STATE_RESOLVING
286 self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pi pe) 312 self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pi pe)
287 self.dns_thread.start() 313 self.dns_thread.start()
288 314
289 def HandleConnect(self, message): 315 def HandleConnect(self, message):
290 global map_localhost 316 global map_localhost
291 if 'addresses' in message and len(message['addresses']): 317 if 'addresses' in message and len(message['addresses']):
292 self.state = self.STATE_CONNECTING 318 self.state = self.STATE_CONNECTING
293 if not self.did_resolve and message['addresses'][0] == '127.0.0.1': 319 if not self.did_resolve and message['addresses'][0] == '127.0.0.1':
294 logging.info('[{0:d}] Connection to localhost detected'.format(self.clie nt_id)) 320 logging.info('[{0:d}] Connection to localhost detected'.format(self.clie nt_id))
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
354 self.state = self.STATE_WAITING_FOR_HANDSHAKE 380 self.state = self.STATE_WAITING_FOR_HANDSHAKE
355 self.ip = None 381 self.ip = None
356 self.addresses = None 382 self.addresses = None
357 self.hostname = None 383 self.hostname = None
358 self.port = None 384 self.port = None
359 self.requested_address = None 385 self.requested_address = None
360 self.buffer = '' 386 self.buffer = ''
361 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 387 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
362 self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460) 388 self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
363 self.needs_close = False 389 self.needs_close = False
364 self.read_available = False
365 self.window_available = options.window
366 390
367 def SendMessage(self, type, message): 391 def SendMessage(self, type, message):
368 message['message'] = type 392 message['message'] = type
369 message['connection'] = self.client_id 393 message['connection'] = self.client_id
370 out_pipe.SendMessage(message) 394 out_pipe.SendMessage(message)
371 395
372 def handle_message(self, message): 396 def handle_message(self, message):
373 if message['message'] == 'data' and 'data' in message and len(message['data' ]) and self.state == self.STATE_CONNECTED: 397 if message['message'] == 'data' and 'data' in message and len(message['data' ]) > 0:
374 if not self.needs_close: 398 self.buffer += message['data']
375 self.buffer += message['data'] 399 if self.state == self.STATE_CONNECTED:
376 self.SendMessage('ack', {}) 400 self.handle_write()
377 elif message['message'] == 'ack':
378 # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
379 self.window_available = min(self.window_available + 2, 350)
380 if self.read_available:
381 self.handle_read()
382 elif message['message'] == 'resolved': 401 elif message['message'] == 'resolved':
383 self.HandleResolved(message) 402 self.HandleResolved(message)
384 elif message['message'] == 'connected': 403 elif message['message'] == 'connected':
385 self.HandleConnected(message) 404 self.HandleConnected(message)
405 self.handle_write()
386 elif message['message'] == 'closed': 406 elif message['message'] == 'closed':
387 if len(self.buffer) == 0: 407 if len(self.buffer) == 0:
408 logging.info('[{0:d}] Server connection close being processed, closing B rowser connection'.format(self.client_id))
388 self.handle_close() 409 self.handle_close()
389 else: 410 else:
411 logging.info('[{0:d}] Server connection close being processed, queuing b rowser connection close'.format(self.client_id))
390 self.needs_close = True 412 self.needs_close = True
391 413
392 def writable(self): 414 def writable(self):
393 return (len(self.buffer) > 0) 415 return len(self.buffer) > 0
394 416
395 def handle_write(self): 417 def handle_write(self):
396 sent = self.send(self.buffer) 418 if len(self.buffer) > 0:
397 logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent)) 419 sent = self.send(self.buffer)
398 self.buffer = self.buffer[sent:] 420 logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent ))
399 if self.needs_close and len(self.buffer) == 0: 421 self.buffer = self.buffer[sent:]
400 self.needs_close = False 422 if self.needs_close and len(self.buffer) == 0:
401 self.handle_close() 423 logging.info('[{0:d}] queued browser connection close being processed, c losing Browser connection'.format(self.client_id))
424 self.needs_close = False
425 self.handle_close()
402 426
403 def handle_read(self): 427 def handle_read(self):
404 global connections 428 global connections
405 global dns_cache 429 global dns_cache
406 if self.window_available == 0:
407 self.read_available = True
408 return
409 self.read_available = False
410 try: 430 try:
411 while self.window_available > 0: 431 while True:
412 # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames) 432 # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
413 data = self.recv(1460) 433 data = self.recv(1460)
414 if data: 434 if data:
415 data_len = len(data) 435 data_len = len(data)
416 if self.state == self.STATE_CONNECTED: 436 if self.state == self.STATE_CONNECTED:
417 logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id , data_len)) 437 logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id , data_len))
418 self.window_available -= 1
419 self.SendMessage('data', {'data': data}) 438 self.SendMessage('data', {'data': data})
420 elif self.state == self.STATE_WAITING_FOR_HANDSHAKE: 439 elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
421 self.state = self.STATE_ERROR #default to an error state, set correc tly if things work out 440 self.state = self.STATE_ERROR #default to an error state, set correc tly if things work out
422 if data_len >= 2 and ord(data[0]) == 0x05: 441 if data_len >= 2 and ord(data[0]) == 0x05:
423 supports_no_auth = False 442 supports_no_auth = False
424 auth_count = ord(data[1]) 443 auth_count = ord(data[1])
425 if data_len == auth_count + 2: 444 if data_len == auth_count + 2:
426 for i in range(auth_count): 445 for i in range(auth_count):
427 offset = i + 2 446 offset = i + 2
428 if ord(data[offset]) == 0: 447 if ord(data[offset]) == 0:
429 supports_no_auth = True 448 supports_no_auth = True
430 if supports_no_auth: 449 if supports_no_auth:
431 # Respond with a message that "No Authentication" was agreed to 450 # Respond with a message that "No Authentication" was agreed to
432 logging.info('[{0:d}] New Socks5 client'.format(self.client_id)) 451 logging.info('[{0:d}] New Socks5 client'.format(self.client_id))
433 response = chr(0x05) + chr(0x00) 452 response = chr(0x05) + chr(0x00)
434 self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST 453 self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
435 self.buffer += response 454 self.buffer += response
455 self.handle_write()
436 elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST: 456 elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
437 self.state = self.STATE_ERROR #default to an error state, set correc tly if things work out 457 self.state = self.STATE_ERROR #default to an error state, set correc tly if things work out
438 if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00: 458 if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
439 if ord(data[1]) == 0x01: #TCP connection (only supported method fo r now) 459 if ord(data[1]) == 0x01: #TCP connection (only supported method fo r now)
440 connections[self.client_id]['server'] = TCPConnection(self.clien t_id) 460 connections[self.client_id]['server'] = TCPConnection(self.clien t_id)
441 self.requested_address = data[3:] 461 self.requested_address = data[3:]
442 port_offset = 0 462 port_offset = 0
443 if ord(data[3]) == 0x01: 463 if ord(data[3]) == 0x01:
444 port_offset = 8 464 port_offset = 8
445 self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(dat a[5]), ord(data[6]), ord(data[7])) 465 self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(dat a[5]), ord(data[6]), ord(data[7]))
(...skipping 15 matching lines...) Expand all
461 if self.ip is None and self.hostname is not None: 481 if self.ip is None and self.hostname is not None:
462 if self.hostname in dns_cache: 482 if self.hostname in dns_cache:
463 self.state = self.STATE_CONNECTING 483 self.state = self.STATE_CONNECTING
464 self.addresses = dns_cache[self.hostname] 484 self.addresses = dns_cache[self.hostname]
465 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 485 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
466 else: 486 else:
467 self.state = self.STATE_RESOLVING 487 self.state = self.STATE_RESOLVING
468 self.SendMessage('resolve', {'hostname': self.hostname, 'p ort': self.port}) 488 self.SendMessage('resolve', {'hostname': self.hostname, 'p ort': self.port})
469 elif self.ip is not None: 489 elif self.ip is not None:
470 self.state = self.STATE_CONNECTING 490 self.state = self.STATE_CONNECTING
491 logging.debug('[{0:d}] Socks Connect - calling getaddrinfo f or {1}:{2:d}'.format(self.client_id, self.ip, self.port))
471 self.addresses = socket.getaddrinfo(self.ip, self.port) 492 self.addresses = socket.getaddrinfo(self.ip, self.port)
472 self.SendMessage('connect', {'addresses': self.addresses, 'p ort': self.port}) 493 self.SendMessage('connect', {'addresses': self.addresses, 'p ort': self.port})
473 else: 494 else:
474 return 495 return
475 except: 496 except:
476 pass 497 pass
477 498
478 def handle_close(self): 499 def handle_close(self):
479 logging.info('[{0:d}] Browser Connection Closed'.format(self.client_id)) 500 logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.clie nt_id))
480 self.state = self.STATE_ERROR 501 self.state = self.STATE_ERROR
481 self.close() 502 self.close()
482 try: 503 try:
483 if self.client_id in connections: 504 if self.client_id in connections:
484 if 'client' in connections[self.client_id]: 505 if 'client' in connections[self.client_id]:
485 del connections[self.client_id]['client'] 506 del connections[self.client_id]['client']
486 if 'server' in connections[self.client_id]: 507 if 'server' in connections[self.client_id]:
487 self.SendMessage('closed', {}) 508 self.SendMessage('closed', {})
488 else: 509 else:
489 del connections[self.client_id] 510 del connections[self.client_id]
490 except: 511 except:
491 pass 512 pass
492 513
493 def HandleResolved(self, message): 514 def HandleResolved(self, message):
494 global dns_cache 515 global dns_cache
495 if self.state == self.STATE_RESOLVING: 516 if self.state == self.STATE_RESOLVING:
496 if 'addresses' in message and len(message['addresses']): 517 if 'addresses' in message and len(message['addresses']):
497 self.state = self.STATE_CONNECTING 518 self.state = self.STATE_CONNECTING
498 self.addresses = message['addresses'] 519 self.addresses = message['addresses']
499 dns_cache[self.hostname] = self.addresses 520 dns_cache[self.hostname] = self.addresses
500 logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname)) 521 logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
501 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.p ort}) 522 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.p ort})
502 else: 523 else:
503 # Send host unreachable error 524 # Send host unreachable error
504 self.state = self.STATE_ERROR 525 self.state = self.STATE_ERROR
505 self.buffer += chr(0x05) + chr(0x04) + self.requested_address 526 self.buffer += chr(0x05) + chr(0x04) + self.requested_address
527 self.handle_write()
506 528
507 def HandleConnected(self, message): 529 def HandleConnected(self, message):
508 if 'success' in message and self.state == self.STATE_CONNECTING: 530 if 'success' in message and self.state == self.STATE_CONNECTING:
509 response = chr(0x05) 531 response = chr(0x05)
510 if message['success']: 532 if message['success']:
511 response += chr(0x00) 533 response += chr(0x00)
512 logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hos tname)) 534 logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hos tname))
513 self.state = self.STATE_CONNECTED 535 self.state = self.STATE_CONNECTED
514 else: 536 else:
515 response += chr(0x04) 537 response += chr(0x04)
516 self.state = self.STATE_ERROR 538 self.state = self.STATE_ERROR
517 response += chr(0x00) 539 response += chr(0x00)
518 response += self.requested_address 540 response += self.requested_address
519 self.buffer += response 541 self.buffer += response
542 self.handle_write()
520 543
521 544
522 ################################################################################ ######################################## 545 ################################################################################ ########################################
523 # stdin command processor 546 # stdin command processor
524 ################################################################################ ######################################## 547 ################################################################################ ########################################
525 class CommandProcessor(): 548 class CommandProcessor():
526 def __init__(self): 549 def __init__(self):
527 thread = threading.Thread(target = self.run, args=()) 550 thread = threading.Thread(target = self.run, args=())
528 thread.daemon = True 551 thread.daemon = True
529 thread.start() 552 thread.start()
530 553
531 def run(self): 554 def run(self):
532 global must_exit 555 global must_exit
533 while not must_exit: 556 while not must_exit:
534 for line in iter(sys.stdin.readline, ''): 557 for line in iter(sys.stdin.readline, ''):
535 self.ProcessCommand(line.strip()) 558 self.ProcessCommand(line.strip())
536 559
537 def ProcessCommand(self, input): 560 def ProcessCommand(self, input):
538 global in_pipe 561 global in_pipe
539 global out_pipe 562 global out_pipe
540 global needs_flush 563 global needs_flush
541 global REMOVE_TCP_OVERHEAD 564 global REMOVE_TCP_OVERHEAD
565 global port_mappings
566 global server
542 if len(input): 567 if len(input):
543 ok = False 568 ok = False
544 try: 569 try:
545 command = input.split() 570 command = input.split()
546 if len(command) and len(command[0]): 571 if len(command) and len(command[0]):
547 if command[0].lower() == 'flush': 572 if command[0].lower() == 'flush':
573 ok = True
574 elif command[0].lower() == 'set' and len(command) >= 3:
575 if command[1].lower() == 'rtt' and len(command[2]):
576 rtt = float(command[2])
577 latency = rtt / 2000.0
578 in_pipe.latency = latency
579 out_pipe.latency = latency
580 ok = True
581 elif command[1].lower() == 'inkbps' and len(command[2]):
582 in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
583 ok = True
584 elif command[1].lower() == 'outkbps' and len(command[2]):
585 out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
586 ok = True
587 elif command[1].lower() == 'mapports' and len(command[2]):
588 SetPortMappings(command[2])
589 ok = True
590 elif command[0].lower() == 'reset' and len(command) >= 2:
591 if command[1].lower() == 'rtt' or command[1].lower() == 'all':
592 in_pipe.latency = 0
593 out_pipe.latency = 0
594 ok = True
595 if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
596 in_pipe.kbps = 0
597 ok = True
598 if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
599 out_pipe.kbps = 0
600 ok = True
601 if command[1].lower() == 'mapports' or command[1].lower() == 'all':
602 port_mappings = {}
603 ok = True
604
605 if ok:
548 needs_flush = True 606 needs_flush = True
549 ok = True
550 elif len(command) >= 3 and command[0].lower() == 'set' and command[1]. lower() == 'rtt' and len(command[2]):
551 rtt = float(command[2])
552 latency = rtt / 2000.0
553 in_pipe.latency = latency
554 out_pipe.latency = latency
555 needs_flush = True
556 ok = True
557 elif len(command) >= 3 and command[0].lower() == 'set' and command[1]. lower() == 'inkbps' and len(command[2]):
558 in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
559 needs_flush = True
560 ok = True
561 elif len(command) >= 3 and command[0].lower() == 'set' and command[1]. lower() == 'outkbps' and len(command[2]):
562 out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
563 needs_flush = True
564 ok = True
565 elif len(command) >= 3 and command[0].lower() == 'set' and command[1]. lower() == 'mapports' and len(command[2]):
566 SetPortMappings(command[2])
567 needs_flush = True
568 ok = True
569 except: 607 except:
570 pass 608 pass
571 if not ok: 609 if not ok:
572 PrintMessage('ERROR') 610 PrintMessage('ERROR')
611 # open and close a local socket which will interrupt the long polling loop to process the flush
612 if needs_flush:
613 s = socket.socket()
614 s.connect((server.ipaddr, server.port))
615 s.close()
573 616
574 617
575 ################################################################################ ######################################## 618 ################################################################################ ########################################
576 # Main Entry Point 619 # Main Entry Point
577 ################################################################################ ######################################## 620 ################################################################################ ########################################
578 def main(): 621 def main():
579 global server 622 global server
580 global options 623 global options
581 global in_pipe 624 global in_pipe
582 global out_pipe 625 global out_pipe
(...skipping 30 matching lines...) Expand all
613 logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(messa ge)s", datefmt="%H:%M:%S") 656 logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(messa ge)s", datefmt="%H:%M:%S")
614 657
615 # Parse any port mappings 658 # Parse any port mappings
616 if options.mapports: 659 if options.mapports:
617 SetPortMappings(options.mapports) 660 SetPortMappings(options.mapports)
618 661
619 map_localhost = options.localhost 662 map_localhost = options.localhost
620 663
621 # Resolve the address for a rewrite destination host if one was specified 664 # Resolve the address for a rewrite destination host if one was specified
622 if options.desthost: 665 if options.desthost:
666 logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.d esthost, GetDestPort(80)))
623 dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80)) 667 dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
624 668
625 # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1 000 to convert to seconds) 669 # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1 000 to convert to seconds)
626 in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE _TCP_OVERHEAD) 670 in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE _TCP_OVERHEAD)
627 out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REM OVE_TCP_OVERHEAD) 671 out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REM OVE_TCP_OVERHEAD)
628 672
629 signal.signal(signal.SIGINT, signal_handler) 673 signal.signal(signal.SIGINT, signal_handler)
630 server = Socks5Server(options.bind, options.port) 674 server = Socks5Server(options.bind, options.port)
631 command_processor = CommandProcessor() 675 command_processor = CommandProcessor()
632 PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.f ormat(server.ipaddr, server.port)) 676 PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.f ormat(server.ipaddr, server.port))
633 run_loop() 677 run_loop()
634 678
635 def signal_handler(signal, frame): 679 def signal_handler(signal, frame):
636 global server 680 global server
637 global must_exit 681 global must_exit
638 logging.error('Exiting...') 682 logging.error('Exiting...')
639 must_exit = True 683 must_exit = True
640 del server 684 del server
641 685
642 686
643 # Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms 687 # Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
644 def run_loop(): 688 def run_loop():
645 global must_exit 689 global must_exit
646 global in_pipe 690 global in_pipe
647 global out_pipe 691 global out_pipe
648 global needs_flush 692 global needs_flush
649 global flush_pipes 693 global flush_pipes
650 gc_check_count = 0 694 global last_activity
695 winmm = None
696
697 # increase the windows timer resolution to 1ms
698 if platform.system() == "Windows":
699 try:
700 import ctypes
701 winmm = ctypes.WinDLL('winmm')
702 winmm.timeBeginPeriod(1)
703 except:
704 pass
705
651 last_activity = time.clock() 706 last_activity = time.clock()
707 last_check = time.clock()
652 # disable gc to avoid pauses during traffic shaping/proxying 708 # disable gc to avoid pauses during traffic shaping/proxying
653 gc.disable() 709 gc.disable()
654 while not must_exit: 710 while not must_exit:
655 asyncore.poll(0.001, asyncore.socket_map) 711 # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
712 lock.acquire()
713 tick_interval = 0.001
714 if background_activity_count == 0:
715 if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.nex t_message is None and out_pipe.queue.empty():
716 tick_interval = 1.0
717 elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 a nd out_pipe.latency == 0:
718 tick_interval = 1.0
719 lock.release()
720 asyncore.poll(tick_interval, asyncore.socket_map)
656 if needs_flush: 721 if needs_flush:
657 flush_pipes = True 722 flush_pipes = True
658 needs_flush = False 723 needs_flush = False
659 if in_pipe.tick(): 724 out_pipe.tick()
660 last_activity = time.clock() 725 in_pipe.tick()
661 if out_pipe.tick():
662 last_activity = time.clock()
663 if flush_pipes: 726 if flush_pipes:
664 PrintMessage('OK') 727 PrintMessage('OK')
665 flush_pipes = False 728 flush_pipes = False
666 # Every 500 loops (~0.5 second) check to see if it is a good time to do a gc 729 # Every 500 ms check to see if it is a good time to do a gc
667 if gc_check_count > 1000: 730 now = time.clock()
668 gc_check_count = 0 731 if now - last_check > 0.5:
732 last_check = now
669 # manually gc after 5 seconds of idle 733 # manually gc after 5 seconds of idle
670 if time.clock() - last_activity >= 5: 734 if now - last_activity >= 5:
671 last_activity = time.clock() 735 last_activity = now
672 logging.debug("Triggering manual GC") 736 logging.debug("Triggering manual GC")
673 gc.collect() 737 gc.collect()
674 else:
675 gc_check_count += 1
676 738
739 if winmm is not None:
740 winmm.timeEndPeriod(1)
677 741
678 def GetDestPort(port): 742 def GetDestPort(port):
679 global port_mappings 743 global port_mappings
680 if port_mappings is not None: 744 if port_mappings is not None:
681 src_port = str(port) 745 src_port = str(port)
682 if src_port in port_mappings: 746 if src_port in port_mappings:
683 return port_mappings[src_port] 747 return port_mappings[src_port]
684 elif 'default' in port_mappings: 748 elif 'default' in port_mappings:
685 return port_mappings['default'] 749 return port_mappings['default']
686 return port 750 return port
687 751
688 752
689 def SetPortMappings(map_string): 753 def SetPortMappings(map_string):
690 global port_mappings 754 global port_mappings
691 port_mappings = {} 755 port_mappings = {}
692 map_string = map_string.strip('\'" \t\r\n') 756 map_string = map_string.strip('\'" \t\r\n')
693 for pair in map_string.split(','): 757 for pair in map_string.split(','):
694 (src, dest) = pair.split(':') 758 (src, dest) = pair.split(':')
695 if src == '*': 759 if src == '*':
696 port_mappings['default'] = int(dest) 760 port_mappings['default'] = int(dest)
697 logging.debug("Default port mapped to port {0}".format(dest)) 761 logging.debug("Default port mapped to port {0}".format(dest))
698 else: 762 else:
699 logging.debug("Port {0} mapped to port {1}".format(src, dest)) 763 logging.debug("Port {0} mapped to port {1}".format(src, dest))
700 port_mappings[src] = int(dest) 764 port_mappings[src] = int(dest)
701 765
702 766
703 if '__main__' == __name__: 767 if '__main__' == __name__:
704 main() 768 main()
OLDNEW
« no previous file with comments | « telemetry/third_party/tsproxy/README.md ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698