| OLD | NEW |
| (Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 __copyright__ = """\ |
| 4 Copyright (c) 2008-2009 Mark Nottingham |
| 5 |
| 6 Permission is hereby granted, free of charge, to any person obtaining a copy |
| 7 of this software and associated documentation files (the "Software"), to deal |
| 8 in the Software without restriction, including without limitation the rights |
| 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 10 copies of the Software, and to permit persons to whom the Software is |
| 11 furnished to do so, subject to the following conditions: |
| 12 |
| 13 The above copyright notice and this permission notice shall be included in |
| 14 all copies or substantial portions of the Software. |
| 15 |
| 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 22 THE SOFTWARE. |
| 23 """ |
| 24 |
| 25 """ |
| 26 Non-Blocking SPDY Client |
| 27 |
| 28 This library allow implementation of an HTTP/1.1 client that is "non-blocking," |
| 29 "asynchronous" and "event-driven" -- i.e., it achieves very high performance |
| 30 and concurrency, so long as the application code does not block (e.g., |
| 31 upon network, disk or database access). Blocking on one response will block |
| 32 the entire client. |
| 33 |
| 34 Instantiate a Client with the following parameter: |
| 35 - res_start (callable) |
| 36 |
| 37 Call req_start on the Client instance to begin a request. It takes the following
|
| 38 arguments: |
| 39 - method (string) |
| 40 - uri (string) |
| 41 - req_hdrs (list of (name, value) tuples) |
| 42 - req_body_pause (callable) |
| 43 and returns: |
| 44 - req_body (callable) |
| 45 - req_done (callable) |
| 46 |
| 47 Call req_body to send part of the request body. It takes the following |
| 48 argument: |
| 49 - chunk (string) |
| 50 |
| 51 Call req_done when the request is complete, whether or not it contains a |
| 52 body. It takes the following argument: |
| 53 - err (error dictionary, or None for no error) |
| 54 |
| 55 req_body_pause is called when the client needs you to temporarily stop sending |
| 56 the request body, or restart. It must take the following argument: |
| 57 - paused (boolean; True means pause, False means unpause) |
| 58 |
| 59 res_start is called to start the response, and must take the following |
| 60 arguments: |
| 61 - status_code (string) |
| 62 - status_phrase (string) |
| 63 - res_hdrs (list of (name, value) tuples) |
| 64 - res_body_pause |
| 65 It must return: |
| 66 - res_body (callable) |
| 67 - res_done (callable) |
| 68 |
| 69 res_body is called when part of the response body is available. It must accept |
| 70 the following parameter: |
| 71 - chunk (string) |
| 72 |
| 73 res_done is called when the response is finished, and must accept the |
| 74 following argument: |
| 75 - err (error dictionary, or None if no error) |
| 76 |
| 77 See the error module for the complete list of valid error dictionaries. |
| 78 |
| 79 Where possible, errors in the response will be indicated with the appropriate |
| 80 5xx HTTP status code (i.e., by calling res_start, res_body and res_done with |
| 81 an error dictionary). However, if a response has already been started, the |
| 82 connection will be dropped (for example, when the response chunking or |
| 83 indicated length are incorrect). In these cases, res_done will still be called |
| 84 with the appropriate error dictionary. |
| 85 """ |
| 86 |
| 87 # FIXME: update docs for API change (move res_start) |
| 88 |
| 89 __author__ = "Mark Nottingham <mnot@mnot.net>" |
| 90 |
| 91 from urlparse import urlsplit |
| 92 |
| 93 import push_tcp |
| 94 from error import ERR_CONNECT, ERR_URL |
| 95 from http_common import WAITING, \ |
| 96 hop_by_hop_hdrs, dummy, get_hdr |
| 97 from spdy_common import SpdyMessageHandler, CTL_SYN_STREAM, FLAG_NONE, FLAG_FIN |
| 98 |
| 99 req_remove_hdrs = hop_by_hop_hdrs + ['host'] |
| 100 |
| 101 # TODO: read timeout support (needs to be in push_tcp?) |
| 102 |
| 103 class SpdyClient(SpdyMessageHandler): |
| 104 "An asynchronous SPDY client." |
| 105 proxy = None |
| 106 connect_timeout = None |
| 107 |
| 108 def req_start(self, method, uri, req_hdrs, res_start_cb, req_body_pause): |
| 109 """ |
| 110 Start a request to uri using method, where |
| 111 req_hdrs is a list of (field_name, field_value) for |
| 112 the request headers. |
| 113 |
| 114 Returns a (req_body, req_done) tuple. |
| 115 """ |
| 116 if self.proxy: |
| 117 (host, port) = self.proxy |
| 118 else: # find out where to connect to the hard way |
| 119 (scheme, authority, path, query, fragment) = urlsplit(uri) |
| 120 if scheme.lower() != 'http': |
| 121 self._handle_error(ERR_URL, "Only HTTP URLs are supported") |
| 122 return dummy, dummy |
| 123 if "@" in authority: |
| 124 userinfo, authority = authority.split("@", 1) |
| 125 if ":" in authority: |
| 126 host, port = authority.rsplit(":", 1) |
| 127 try: |
| 128 port = int(port) |
| 129 except ValueError: |
| 130 self._handle_error(ERR_URL, "Non-integer port in URL") |
| 131 return dummy, dummy |
| 132 else: |
| 133 host, port = authority, 80 |
| 134 conn = _conn_pool.get(host, port, SpdyConnection, self.connect_timeout) |
| 135 return conn.req_start(method, uri, req_hdrs, res_start_cb, req_body_paus
e) |
| 136 |
| 137 |
| 138 class SpdyConnection(SpdyMessageHandler): |
| 139 "A SPDY connection." |
| 140 |
| 141 def __init__(self, log=None): |
| 142 SpdyMessageHandler.__init__(self) |
| 143 self.log = log or dummy |
| 144 self._tcp_conn = None |
| 145 self._req_body_pause_cb = None # FIXME: re-think pausing |
| 146 self._streams = {} |
| 147 self._output_buffer = [] |
| 148 self._highest_stream_id = -1 |
| 149 |
| 150 def req_start(self, method, uri, req_hdrs, res_start_cb, req_body_pause): |
| 151 req_hdrs = [i for i in req_hdrs if not i[0].lower() in req_remove_hdrs] |
| 152 req_hdrs.append(('method', method)) |
| 153 req_hdrs.append(('url', uri)) |
| 154 req_hdrs.append(('version', 'HTTP/1.1')) |
| 155 self._highest_stream_id += 2 # TODO: check to make sure it's not too hig
h.. what then? |
| 156 stream_id = self._highest_stream_id |
| 157 self._streams[stream_id] = [res_start_cb, req_body_pause, None, None] |
| 158 self._output(self._ser_syn_frame(CTL_SYN_STREAM, FLAG_NONE, stream_id, r
eq_hdrs)) |
| 159 def req_body(*args): |
| 160 return self.req_body(stream_id, *args) |
| 161 def req_done(*args): |
| 162 return self.req_done(stream_id, *args) |
| 163 return req_body, req_done |
| 164 |
| 165 def req_body(self, stream_id, chunk): |
| 166 "Send part of the request body. May be called zero to many times." |
| 167 self._output(self._ser_data_frame(stream_id, FLAG_NONE, chunk)) |
| 168 |
| 169 def req_done(self, stream_id, err): |
| 170 """ |
| 171 Signal the end of the request, whether or not there was a body. MUST be |
| 172 called exactly once for each request. |
| 173 |
| 174 If err is not None, it is an error dictionary (see the error module) |
| 175 indicating that an HTTP-specific (i.e., non-application) error occurred |
| 176 while satisfying the request; this is useful for debugging. |
| 177 """ |
| 178 self._output(self._ser_data_frame(stream_id, FLAG_FIN, "")) |
| 179 # TODO: delete stream after checking that input side is half-closed |
| 180 |
| 181 def res_body_pause(self, paused): |
| 182 "Temporarily stop / restart sending the response body." |
| 183 if self._tcp_conn and self._tcp_conn.tcp_connected: |
| 184 self._tcp_conn.pause(paused) |
| 185 |
| 186 # Methods called by push_tcp |
| 187 |
| 188 def handle_connect(self, tcp_conn): |
| 189 "The connection has succeeded." |
| 190 self._tcp_conn = tcp_conn |
| 191 self._output("") # kick the output buffer |
| 192 return self._handle_input, self._conn_closed, self._req_body_pause |
| 193 |
| 194 def handle_connect_error(self, host, port, err): |
| 195 "The connection has failed." |
| 196 import os, types, socket |
| 197 if type(err) == types.IntType: |
| 198 err = os.strerror(err) |
| 199 elif isinstance(err, socket.error): |
| 200 err = err[1] |
| 201 else: |
| 202 err = str(err) |
| 203 self._handle_error(ERR_CONNECT, err) |
| 204 |
| 205 def _conn_closed(self): |
| 206 "The server closed the connection." |
| 207 if self._input_buffer: |
| 208 self._handle_input("") |
| 209 # TODO: figure out what to do with existing conns |
| 210 |
| 211 def _req_body_pause(self, paused): |
| 212 "The client needs the application to pause/unpause the request body." |
| 213 # FIXME: figure out how pausing should work. |
| 214 if self._req_body_pause_cb: |
| 215 self._req_body_pause_cb(paused) |
| 216 |
| 217 # Methods called by common.SpdyMessageHandler |
| 218 |
| 219 def _input_start(self, stream_id, stream_priority, hdr_tuples): |
| 220 """ |
| 221 Take the top set of headers from the input stream, parse them |
| 222 and queue the request to be processed by the application. |
| 223 """ |
| 224 status = get_hdr(hdr_tuples, 'status')[0] |
| 225 try: |
| 226 res_code, res_phrase = status.split(None, 1) |
| 227 except ValueError: |
| 228 res_code = status.rstrip() |
| 229 res_phrase = "" |
| 230 self._streams[stream_id][1:2] = self._streams[stream_id][0]( |
| 231 "HTTP/1.1", res_code, res_phrase, hdr_tuples, self.res_body_pause) |
| 232 |
| 233 def _input_body(self, stream_id, chunk): |
| 234 "Process a response body chunk from the wire." |
| 235 self._streams[stream_id][1](chunk) |
| 236 |
| 237 def _input_end(self, stream_id): |
| 238 "Indicate that the response body is complete." |
| 239 self._streams[stream_id][2](None) |
| 240 # TODO: delete stream if output side is half-closed. |
| 241 |
| 242 def _input_error(self, err, detail=None): |
| 243 "Indicate a parsing problem with the response body." |
| 244 if self._tcp_conn: |
| 245 self._tcp_conn.close() |
| 246 self._tcp_conn = None |
| 247 err['detail'] = detail |
| 248 self.res_done_cb(err) |
| 249 |
| 250 def _output(self, chunk): |
| 251 self._output_buffer.append(chunk) |
| 252 if self._tcp_conn and self._tcp_conn.tcp_connected: |
| 253 self._tcp_conn.write("".join(self._output_buffer)) |
| 254 self._output_buffer = [] |
| 255 |
| 256 # misc |
| 257 |
| 258 def _handle_error(self, err, detail=None): |
| 259 "Handle a problem with the request by generating an appropriate response
." |
| 260 assert self._input_state == WAITING |
| 261 if self._tcp_conn: |
| 262 self._tcp_conn.close() |
| 263 self._tcp_conn = None |
| 264 if detail: |
| 265 err['detail'] = detail |
| 266 status_code, status_phrase = err.get('status', ('504', 'Gateway Timeout'
)) |
| 267 hdrs = [ |
| 268 ('Content-Type', 'text/plain'), |
| 269 ('Connection', 'close'), |
| 270 ] |
| 271 body = err['desc'] |
| 272 if err.has_key('detail'): |
| 273 body += " (%s)" % err['detail'] |
| 274 res_body_cb, res_done_cb = self.res_start_cb( |
| 275 "1.1", status_code, status_phrase, hdrs, dummy) |
| 276 res_body_cb(str(body)) |
| 277 push_tcp.schedule(0, res_done_cb, err) |
| 278 |
| 279 |
| 280 class _SpdyConnectionPool: |
| 281 "A pool of open connections for use by the client." |
| 282 _conns = {} |
| 283 |
| 284 def get(self, host, port, connection_handler, connect_timeout): |
| 285 "Find a connection for (host, port), or create a new one." |
| 286 try: |
| 287 conn = self._conns[(host, port)] |
| 288 except KeyError: |
| 289 conn = connection_handler() |
| 290 push_tcp.create_client( |
| 291 host, port, |
| 292 conn.handle_connect, conn.handle_connect_error, |
| 293 connect_timeout |
| 294 ) |
| 295 self._conns[(host, port)] = conn |
| 296 return conn |
| 297 |
| 298 #TODO: remove conns from _conns when they close |
| 299 |
| 300 _conn_pool = _SpdyConnectionPool() |
| 301 |
| 302 |
| 303 def test_client(request_uri): |
| 304 "A simple demonstration of a client." |
| 305 def printer(version, status, phrase, headers, res_pause): |
| 306 "Print the response headers." |
| 307 print "HTTP/%s" % version, status, phrase |
| 308 print "\n".join(["%s:%s" % header for header in headers]) |
| 309 print |
| 310 def body(chunk): |
| 311 print chunk |
| 312 def done(err): |
| 313 if err: |
| 314 print "*** ERROR: %s (%s)" % (err['desc'], err['detail']) |
| 315 push_tcp.stop() |
| 316 return body, done |
| 317 c = SpdyClient() |
| 318 req_body_write, req_done = c.req_start("GET", request_uri, [], printer, dumm
y) |
| 319 req_done(None) |
| 320 push_tcp.run() |
| 321 |
| 322 if __name__ == "__main__": |
| 323 import sys |
| 324 test_client(sys.argv[1]) |
| OLD | NEW |