| OLD | NEW |
| (Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 __copyright__ = """\ |
| 4 Copyright (c) 2008-2009 Mark Nottingham |
| 5 |
| 6 Permission is hereby granted, free of charge, to any person obtaining a copy |
| 7 of this software and associated documentation files (the "Software"), to deal |
| 8 in the Software without restriction, including without limitation the rights |
| 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 10 copies of the Software, and to permit persons to whom the Software is |
| 11 furnished to do so, subject to the following conditions: |
| 12 |
| 13 The above copyright notice and this permission notice shall be included in |
| 14 all copies or substantial portions of the Software. |
| 15 |
| 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 22 THE SOFTWARE. |
| 23 """ |
| 24 |
| 25 """ |
| 26 Non-Blocking SPDY Server |
| 27 |
| 28 This library allow implementation of an SPDY server that is "non-blocking," |
| 29 "asynchronous" and "event-driven" -- i.e., it achieves very high performance |
| 30 and concurrency, so long as the application code does not block (e.g., |
| 31 upon network, disk or database access). Blocking on one request will block |
| 32 the entire server. |
| 33 |
| 34 Instantiate a Server with the following parameters: |
| 35 - host (string) |
| 36 - port (int) |
| 37 - req_start (callable) |
| 38 |
| 39 req_start is called when a request starts. It must take the following arguments: |
| 40 - method (string) |
| 41 - uri (string) |
| 42 - req_hdrs (list of (name, value) tuples) |
| 43 - res_start (callable) |
| 44 - req_body_pause (callable) |
| 45 and return: |
| 46 - req_body (callable) |
| 47 - req_done (callable) |
| 48 |
| 49 req_body is called when part of the request body is available. It must take the |
| 50 following argument: |
| 51 - chunk (string) |
| 52 |
| 53 req_done is called when the request is complete, whether or not it contains a |
| 54 body. It must take the following argument: |
| 55 - err (error dictionary, or None for no error) |
| 56 |
| 57 Call req_body_pause when you want the server to temporarily stop sending the |
| 58 request body, or restart. You must provide the following argument: |
| 59 - paused (boolean; True means pause, False means unpause) |
| 60 |
| 61 Call res_start when you want to start the response, and provide the following |
| 62 arguments: |
| 63 - status_code (string) |
| 64 - status_phrase (string) |
| 65 - res_hdrs (list of (name, value) tuples) |
| 66 - res_body_pause |
| 67 It returns: |
| 68 - res_body (callable) |
| 69 - res_done (callable) |
| 70 |
| 71 Call res_body to send part of the response body to the client. Provide the |
| 72 following parameter: |
| 73 - chunk (string) |
| 74 |
| 75 Call res_done when the response is finished, and provide the |
| 76 following argument if appropriate: |
| 77 - err (error dictionary, or None for no error) |
| 78 |
| 79 See the error module for the complete list of valid error dictionaries. |
| 80 |
| 81 Where possible, errors in the request will be responded to with the appropriate |
| 82 4xx HTTP status code. However, if a response has already been started, the |
| 83 connection will be dropped (for example, when the request chunking or |
| 84 indicated length are incorrect). |
| 85 """ |
| 86 |
| 87 __author__ = "Mark Nottingham <mnot@mnot.net>" |
| 88 |
| 89 import os |
| 90 import sys |
| 91 import logging |
| 92 |
| 93 import push_tcp |
| 94 from spdy_common import SpdyMessageHandler, CTL_SYN_REPLY, FLAG_NONE, FLAG_FIN |
| 95 from http_common import get_hdr, dummy |
| 96 |
| 97 # FIXME: assure that the connection isn't closed before reading the entire req b
ody |
| 98 |
| 99 class SpdyServer: |
| 100 "An asynchronous SPDY server." |
| 101 def __init__(self, |
| 102 host, |
| 103 port, |
| 104 use_ssl, |
| 105 certfile, |
| 106 keyfile, |
| 107 request_handler, |
| 108 log=None): |
| 109 self.request_handler = request_handler |
| 110 self.use_ssl = use_ssl |
| 111 self.server = push_tcp.create_server(host, port, use_ssl, certfile, keyf
ile, self.handle_connection) |
| 112 self.log = log |
| 113 |
| 114 def handle_connection(self, tcp_conn): |
| 115 "Process a new push_tcp connection, tcp_conn." |
| 116 conn = SpdyServerConnection(self.request_handler, tcp_conn, self.log) |
| 117 return conn._handle_input, conn._conn_closed, conn._res_body_pause |
| 118 |
| 119 |
| 120 class SpdyServerConnection(SpdyMessageHandler): |
| 121 "A handler for a SPDY server connection." |
| 122 def __init__(self, request_handler, tcp_conn, log=None): |
| 123 SpdyMessageHandler.__init__(self) |
| 124 self.request_handler = request_handler |
| 125 self._tcp_conn = tcp_conn |
| 126 self.log = log or dummy |
| 127 self._streams = {} |
| 128 self._res_body_pause_cb = False |
| 129 self.log.debug("new connection %s" % id(self)) |
| 130 # SPDY has 4 priorities. write_queue is an array of [0..3], one for eac
h priority. |
| 131 self.write_queue = [] |
| 132 for index in range(0,4): |
| 133 self.write_queue.append([]) |
| 134 # Write pending when a write to the output queue has been scheduled |
| 135 self.write_pending = False |
| 136 |
| 137 def res_start(self, stream_id, stream_priority, status_code, status_phrase,
res_hdrs, res_body_pause): |
| 138 "Start a response. Must only be called once per response." |
| 139 self.log.debug("res_start %s" % stream_id) |
| 140 self._res_body_pause_cb = res_body_pause |
| 141 res_hdrs.append(('status', "%s %s" % (status_code, status_phrase))) |
| 142 # TODO: hop-by-hop headers? |
| 143 self._queue_frame(stream_priority, self._ser_syn_frame(CTL_SYN_REPLY, FL
AG_NONE, stream_id, res_hdrs)) |
| 144 def res_body(*args): |
| 145 return self.res_body(stream_id, stream_priority, *args) |
| 146 def res_done(*args): |
| 147 return self.res_done(stream_id, stream_priority, *args) |
| 148 return res_body, res_done |
| 149 |
| 150 def res_body(self, stream_id, stream_priority, chunk): |
| 151 "Send part of the response body. May be called zero to many times." |
| 152 if chunk: |
| 153 do_chunking = True |
| 154 if stream_priority == 0: |
| 155 do_chunking = True |
| 156 if do_chunking: |
| 157 kMaxChunkSize = 1460 * 4 |
| 158 start_pos = 0 |
| 159 chunk_size = len(chunk) |
| 160 while start_pos < chunk_size: |
| 161 size = min(chunk_size - start_pos, kMaxChunkSize) |
| 162 self._queue_frame(stream_priority, self._ser_data_frame(stre
am_id, FLAG_NONE, chunk[start_pos:start_pos + size])) |
| 163 start_pos += size |
| 164 else: |
| 165 self._queue_frame(stream_priority, self._ser_data_frame(stream_i
d, FLAG_NONE, chunk)) |
| 166 |
| 167 def res_done(self, stream_id, stream_priority, err): |
| 168 """ |
| 169 Signal the end of the response, whether or not there was a body. MUST be |
| 170 called exactly once for each response. |
| 171 |
| 172 If err is not None, it is an error dictionary (see the error module) |
| 173 indicating that an HTTP-specific (i.e., non-application) error occured |
| 174 in the generation of the response; this is useful for debugging. |
| 175 """ |
| 176 self._queue_frame(stream_priority, self._ser_data_frame(stream_id, FLAG_
FIN, "")) |
| 177 # TODO: delete stream after checking that input side is half-closed |
| 178 |
| 179 def req_body_pause(self, paused): |
| 180 "Indicate that the server should pause (True) or unpause (False) the req
uest." |
| 181 if self._tcp_conn and self._tcp_conn.tcp_connected: |
| 182 self._tcp_conn.pause(paused) |
| 183 |
| 184 # Methods called by push_tcp |
| 185 |
| 186 def _res_body_pause(self, paused): |
| 187 "Pause/unpause sending the response body." |
| 188 if self._res_body_pause_cb: |
| 189 self._res_body_pause_cb(paused) |
| 190 |
| 191 def _conn_closed(self): |
| 192 "The server connection has closed." |
| 193 pass # FIXME: any cleanup necessary? |
| 194 # self.pause() |
| 195 # self.tcp_conn.handler = None |
| 196 # self.tcp_conn = None |
| 197 |
| 198 def _has_write_data(self): |
| 199 for index in range(0, 4): |
| 200 if len(self.write_queue[index]) > 0: |
| 201 return True |
| 202 return False |
| 203 |
| 204 def _write_frame_callback(self): |
| 205 self.write_pending = False |
| 206 |
| 207 # Find the highest priority data chunk and send it. |
| 208 for index in range(0, 4): |
| 209 if len(self.write_queue[index]) > 0: |
| 210 data = self.write_queue[index][0] |
| 211 self.write_queue[index] = self.write_queue[index][1:] |
| 212 self._output(data) |
| 213 break |
| 214 if self._has_write_data(): |
| 215 self._schedule_write() |
| 216 |
| 217 def _schedule_write(self): |
| 218 # We only need one write scheduled at a time. |
| 219 if not self.write_pending: |
| 220 push_tcp.schedule(0, self._write_frame_callback) |
| 221 self.write_pending = True |
| 222 |
| 223 def _queue_frame(self, priority, chunk): |
| 224 self.write_queue[priority].append(chunk) |
| 225 self._schedule_write() |
| 226 |
| 227 # Methods called by common.SpdyRequestHandler |
| 228 |
| 229 def _output(self, chunk): |
| 230 if self._tcp_conn: |
| 231 self._tcp_conn.write(chunk) |
| 232 |
| 233 def _input_start(self, stream_id, stream_priority, hdr_tuples): |
| 234 self.log.debug("request start %s %s" % (stream_id, hdr_tuples)) |
| 235 method = get_hdr(hdr_tuples, 'method')[0] # FIXME: error handling |
| 236 uri = get_hdr(hdr_tuples, 'url')[0] # FIXME: error handling |
| 237 assert not self._streams.has_key(stream_id) # FIXME |
| 238 def res_start(*args): |
| 239 return self.res_start(stream_id, stream_priority, *args) |
| 240 # TODO: sanity checks / catch errors from requst_handler |
| 241 self._streams[stream_id] = self.request_handler( |
| 242 method, uri, hdr_tuples, res_start, self.req_body_pause) |
| 243 |
| 244 def _input_body(self, stream_id, chunk): |
| 245 "Process a request body chunk from the wire." |
| 246 if self._streams.has_key(stream_id): |
| 247 self._streams[stream_id][0](chunk) |
| 248 |
| 249 def _input_end(self, stream_id): |
| 250 "Indicate that the request body is complete." |
| 251 if self._streams.has_key(stream_id): |
| 252 self._streams[stream_id][1](None) |
| 253 # TODO: delete stream if output side is half-closed. |
| 254 |
| 255 def _input_error(self, stream_id, err, detail=None): |
| 256 "Indicate a parsing problem with the request body." |
| 257 # FIXME: rework after fixing spdy_common |
| 258 err['detail'] = detail |
| 259 if self._tcp_conn: |
| 260 self._tcp_conn.close() |
| 261 self._tcp_conn = None |
| 262 if self._streams.has_key(stream_id): |
| 263 self._streams[stream_id][1](err) |
| 264 |
| 265 # TODO: re-evaluate if this is necessary in SPDY |
| 266 def _handle_error(self, err, detail=None): |
| 267 "Handle a problem with the request by generating an appropriate response
." |
| 268 if detail: |
| 269 err['detail'] = detail |
| 270 status_code, status_phrase = err.get('status', ('400', 'Bad Request')) |
| 271 hdrs = [ |
| 272 ('Content-Type', 'text/plain'), |
| 273 ] |
| 274 body = err['desc'] |
| 275 if err.has_key('detail'): |
| 276 body += " (%s)" % err['detail'] |
| 277 self.res_start(status_code, status_phrase, hdrs, dummy) |
| 278 self.res_body(body) |
| 279 self.res_done() |
| 280 |
| 281 |
| 282 def test_handler(method, uri, hdrs, res_start, req_pause): |
| 283 """ |
| 284 An extremely simple (and limited) server request_handler. |
| 285 """ |
| 286 code = "200" |
| 287 phrase = "OK" |
| 288 res_hdrs = [('Content-Type', 'text/plain'), ('version', 'HTTP/1.1')] |
| 289 res_body, res_done = res_start(code, phrase, res_hdrs, dummy) |
| 290 res_body('This is SPDY.') |
| 291 res_done(None) |
| 292 return dummy, dummy |
| 293 |
| 294 |
| 295 if __name__ == "__main__": |
| 296 logging.basicConfig() |
| 297 log = logging.getLogger('server') |
| 298 log.setLevel(logging.INFO) |
| 299 log.info("PID: %s\n" % os.getpid()) |
| 300 h, p = '127.0.0.1', int(sys.argv[1]) |
| 301 server = SpdyServer(h, p, test_handler, log) |
| 302 push_tcp.run() |
| OLD | NEW |