/[glsr]/trunk/harmonious/server/server.py
Gentoo

Contents of /trunk/harmonious/server/server.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 109 - (show annotations) (download) (as text)
Mon Aug 1 14:40:49 2005 UTC (9 years ago) by port001
File MIME type: text/x-python
File size: 23030 byte(s)
Added a 'flush2disk' mode to the SessionStorage class which pickles session data to disk when shut down, and unpickles it on startup.

1 #!/usr/bin/python
2
3 __id__ = '$Id$'
4 __modulename__ = 'Harmonious'
5
6 import os
7 import sys
8 import copy
9 import time
10 import Queue
11 import socket
12 import cStringIO
13 import traceback
14 import threading
15 import SocketServer
16 import BaseHTTPServer
17 from copy import deepcopy
18 from types import ListType
19
20 import harmonious
21 from harmonious import server
22 from harmonious._config import Config
23 from harmonious._response import Response
24 from harmonious._dispatcher import Dispatcher
25 from harmonious._sessionutils import SessionStorage
26 from harmonious._logging import ErrorLog, AccessLog
27 from harmonious._requestfactory import RequestFactory
28 from harmonious._module import load_module, map_hosts, map_modules
29
30 DEFAULT_ERROR_MESSAGE = """\
31 <head>
32 <title>Error response</title>
33 </head>
34 <body>
35 <h1>Error response</h1>
36 <p>Error code %(code)d.
37 <p>Message: %(message)s.
38 <p>Error code explanation: %(code)s = %(explain)s.
39 </body>
40 """
41
42 def distinguish_host(host_header):
43
44 host_header = host_header.lower()
45
46 # We don't care about the port number
47 colon_pos = host_header.find(':')
48
49 if colon_pos >= 0:
50
51 host_header = host_header[:colon_pos]
52
53 if host_header in server.config.hosts.keys():
54
55 return host_header
56
57 return server.config.server['Host']
58
59 class ThreadedServer(SocketServer.ThreadingTCPServer):
60 """
61 Inherit the ThreadungTCPServer class so we can handle each connection in
62 a seperate thread.
63 """
64
65 allow_reuse_address = True
66
67 __weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
68 __monthname = [None,
69 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
70 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
71
72 __req_threads = []
73 __reqQueue = Queue.Queue()
74
75 def server_activate(self):
76 """
77 Instantiate our workers threads and set them running
78 """
79
80 print "server_activate"
81 for i in xrange(int(server.config.server['MinThreads'])):
82 self.__req_threads.append(RequestThread(self.__reqQueue))
83
84 for thread in self.__req_threads:
85 thread.start()
86 print "done"
87
88 self.socket.listen(self.request_queue_size)
89
90 def date_time_string(self):
91 """
92 Return the current date and time formatted for a message header.
93 Borrowed from BaseHTTPServer.BaseHTTPRequestHandler.
94 """
95
96 now = time.time()
97 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
98 s = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
99 self.__weekdayname[wd],
100 day, self.__monthname[month], year,
101 hh, mm, ss)
102 return s
103
104 def serve_forever(self):
105
106 while 1:
107 try:
108 self.handle_request()
109 except KeyboardInterrupt:
110 for thread in self.__req_threads:
111 self.__reqQueue.put((None, None, None, None, None))
112 for thread in self.__req_threads:
113 thread.join()
114
115 return
116
117 def process_request_thread(self, request, client_address):
118
119 # Check request type POST/GET etc
120 # if it's GET, just look for headers
121 # if it's POST, look for headers, then look for Content-Length
122
123 def __bad_request(line):
124 """
125 Send the client a 400 Bad request response
126 """
127
128 request_msg = (DEFAULT_ERROR_MESSAGE %
129 {'code': 400, 'message': 'Bad request', 'explain': "Bad request syntax: %s" % line})
130 request.send('HTTP/1.1 400 Bad request\r\n')
131 request.send("Server: Harmonious\%s\r\n" % harmonious.__version__)
132 request.send("Date: %s\r\n" % self.date_time_string())
133 request.send('Content-Type: text/html\r\n')
134 request.send("Content-Length: %d\r\n" % len(request_msg))
135 #request.send('Connection: close\r\n')
136 request.send('\r\n')
137 request.send(request_msg)
138
139 header_data = ''
140 body_data = ''
141 request_data = ''
142 raw_request_line = ''
143 keep_alive = False
144 keep_alive_value = 10 # Default value only used whilst waiting for initial request
145 http_version = ''
146 tmp_buffer = ''
147 buffer = []
148 in_request = False
149 got_request = False
150 got_headers = False
151 got_body = False
152 #reqEvent = threading.Event()
153
154 server._connections_lock.acquire()
155 server._connections[client_address[1]] = {'addr': client_address[0], 'count': 0}
156 server._connections_lock.release()
157
158 print "NEW CONNECTION"
159
160 while 1:
161
162 in_buffer = ''
163
164 if tmp_buffer != '':
165 buffer = [tmp_buffer]
166 tmp_buffer = ''
167
168 if len(buffer) == 0:
169
170 if not in_request:
171 print "TIMEOUT", keep_alive_value
172 request.settimeout(keep_alive_value)
173 else:
174 request.settimeout(None)
175
176 #buffer.seek(0)
177 try:
178 # 1024 is big enough for most requests, except for maybe
179 # large file uploads. In which case, we save the incomplete
180 # block for the next recv().
181 (in_buffer, address) = request.recvfrom(1024)
182 #print "IN_BUFFER %s:" % address, in_buffer, '|'
183 except socket.timeout:
184 self.close_request(request)
185 print "Break 1"
186 break
187 except socket.error, e:
188 print "SOCKET ERROR:", e
189 break
190
191 if not in_buffer:
192 self.close_request(request)
193 break
194
195 #in_buffer = "%s%s" % (tmp_buffer, in_buffer)
196 #tmp_buffer = ''
197 lines = in_buffer.split('\n')
198
199 #print "LINES", lines
200
201 i = 0
202
203 for line in lines:
204 i = i + 1
205 if i == len(lines):
206 if in_buffer.endswith('\n'):
207 if line.endswith('\r'):
208 buffer.append("%s\n" % line)
209 else:
210 buffer.append("%s" % line)
211 else:
212 buffer.append("%s\n" % line)
213
214 #print buffer
215
216 if not got_request:
217
218 raw_request_line = buffer.pop(0)
219
220 if raw_request_line != '':
221 request_line = raw_request_line.strip('\r\n')
222 else:
223 self.close_request(request)
224 print "Break 2"
225 break
226
227 words = request_line.split()
228
229 #print "WORDS", words
230
231 if len(words) == 3:
232
233 (command, path, version) = words
234
235 if version == 'HTTP/1.0':
236 request_version = '1.0'
237 elif version == 'HTTP/1.1':
238 request_version = '1.1'
239 elif version == 'HTTP/0.9':
240 request_version = '0.9'
241 else:
242 __bad_request("Unknown HTTP version '%s'" % version)
243 #self.close_request(request)
244 print "Break 3"
245 continue
246
247 elif len(words) == 2:
248 # 0.9
249 (command, path) = words
250 request_version = '0.9'
251 else:
252 __bad_request(request_line)
253 #self.close_request(request)
254 print "Break 4"
255 break
256
257 in_request = True
258 print "Got new request line"
259 got_request = True
260
261 if not got_headers:
262
263 # Read the headers
264 if len(buffer) == 0:
265 continue
266
267 header = buffer.pop(0)
268 #pos = pos + 1
269
270 if header == '':
271 print "CONTINUE"
272 continue
273
274 #print "Header1: |%s|" % header
275 header_data = "%s%s" % (header_data, header)
276
277 if header in ('\r\n', '\n'):
278 # End of headers
279 #print "End of headers"
280 got_headers = True
281 #continue
282
283 conn_found = False
284 conn_value = ''
285 ka_found = False
286 cl_found = False
287 cl_value = 0
288
289 if not got_headers:
290
291 while 1:
292
293 try:
294 header = buffer.pop(0)
295 except IndexError:
296 #got_headers = True
297 print "break 6"
298 break
299
300 #print "Header2: |%s|" % header
301
302 header_data = "%s%s" % (header_data, header)
303
304 if header in ('\r\n', '\n'):
305 # End of headers
306 #print "End of headers"
307 got_headers = True
308 break
309
310 header = header.strip('\r\n')
311
312 # Look for a Connection header
313 if header.startswith('Connection:') and not conn_found:
314 (conn, conn_value) = header.split(':')
315 conn_value = conn_value.lstrip(' ').lower()
316 conn_found = True
317
318 # Look for a Keep-Alive header
319 elif header.startswith('Keep-Alive:') and not ka_found:
320 (ka, keep_alive_value) = header.split(':')
321 keep_alive_value = keep_alive_value.lstrip(' ')
322 #print "KP FOUND", keep_alive_value
323 ka_found = True
324
325 elif command == 'POST' and not cl_found:
326 # Look for a Content-Length header
327 if header.startswith('Content-Length:'):
328 (cl, cl_value) = header.split(':')
329 cl_value = cl_value.lstrip(' ')
330 cl_found = True
331
332 if not got_headers:
333 continue
334
335 if conn_found:
336
337 if conn_value == 'close':
338 keep_alive = False
339 elif conn_value == 'keep-alive':
340 if request_version == '0.9':
341 # HTTP/0.9 does not support persistent connections
342 keep_alive = False
343 else:
344 keep_alive = True
345
346 if not ka_found:
347 print "KA NOT FOUND"
348 keep_alive_value = 10
349 else:
350 try:
351 keep_alive_value = int(keep_alive_value)
352 if keep_alive_value > 300:
353 keep_alive_value = 300
354 #print "KEEP ALIVE", keep_alive_value
355 except ValueError:
356 __bad_request("Keep-Alive: %s" % keep_alive_value)
357 #self.close_request(request)
358 break
359
360 else:
361 print request_version
362 # HTTP/1.1 specifies that all connections a persistant
363 # unless a 'Connection: close' header is sent.
364 if request_version == '1.1':
365 keep_alive = True
366 keep_alive_value = 10
367 else:
368 keep_alive = False
369
370 if command == 'POST' and not got_body:
371
372 print "PARSING POST"
373
374 print buffer
375
376 if not cl_found:
377 __bad_request('Missing Content-Length header')
378 #self.close_request(request)
379 break
380
381 try:
382 cl_value = int(cl_value)
383 except ValueError:
384 __bad_request("Content-Length: %s" % cl_value)
385 #self.close_request(request)
386 break
387
388 while 1:
389
390 try:
391 line = buffer.pop(0)
392 except IndexError:
393 print "break 5"
394 #got_body = True
395 break
396
397 if line in ('\r\n', '\n'):
398 got_body = True
399 break
400
401 if (len(line) + len(body_data)) <= cl_value:
402 # We can use the whole line
403 body_data = "%s%s" % (body_data, line)
404 else:
405 # Read part of the line
406 pos = cl_value - len(body_data)
407 body_data = "%s%s" % (body_data, line[:pos])
408 tmp_buffer = "%s%s" % (tmp_buffer, line[pos:])
409 print "TMP_BUFFER", tmp_buffer
410
411 if len(body_data) == cl_value:
412 got_body = True
413 break
414 elif len(body_data) > cl_value:
415 print "READ TOO MUCH BODY!!"
416 break
417
418 print "Got complete request from %d" % client_address[1]
419
420 # We have a complete request, give it to a worker thread for processing.
421 server._connections_lock.acquire()
422 server._connections[client_address[1]]['count'] = server._connections[client_address[1]]['count'] + 1
423 server._connections_lock.release()
424
425 request_data = "%s%s%s" % (raw_request_line, header_data, body_data)
426 complete_request = cStringIO.StringIO(request_data) # Returns an InputType
427
428 #reqEvent.clear()
429 reqEvent = threading.Event()
430 reqEvent.clear()
431
432 self.__reqQueue.put((reqEvent, request, client_address, complete_request, (keep_alive, keep_alive_value)))
433
434 # Wait for the request to finish before receiving the next
435 print "%d waiting on event..." % client_address[1]
436 reqEvent.wait()
437 print "Got event for %d" % client_address[1]
438
439 if not keep_alive:
440 # Don't close the request here because a request may still be being processed,
441 # instead close in _dispatcher.dispatch.
442 break
443
444 request_line = ''
445 header_data = ''
446 body_data = ''
447 complete_request = None
448 in_request = False
449 got_request = False
450 got_headers = False
451 got_body = False
452
453 server._connections_lock.acquire()
454 server._connections.pop(client_address[1])
455 server._connections_lock.release()
456
457 print "Closing connection", client_address[1]
458 #self.close_request(request)
459
460 class RequestThread(threading.Thread):
461
462 def __init__(self, reqQueue):
463
464 threading.Thread.__init__(self)
465 self.__reqQueue = reqQueue
466
467 def run(self):
468
469 while 1:
470
471 print "Thread '%s' waiting for request..." % threading.currentThread().getName()
472 (reqEvent, request, client_address, request_data, keep_alive) = self.__reqQueue.get()
473
474 if (reqEvent, request, client_address, request_data, keep_alive) == (None, None, None, None, None):
475 print "Thread '%s' resigning from duty..." % threading.currentThread().getName()
476 return
477
478 print "Thread '%s' handling request for %d" % (threading.currentThread().getName(), client_address[1])
479
480 if self.verify_request(request, client_address):
481
482 try:
483 self.process_request(request, client_address, request_data, keep_alive)
484 except socket.error, e:
485 print "!!!!!!!!!!SOCKET ERROR", e
486 # The client probably closed the connection mid-request.
487 pass
488 except:
489 print "error"
490 self.handle_error(request, client_address)
491 self.close_request(request)
492
493 print "Thread '%s' completed request for %d" % (threading.currentThread().getName(), client_address[1])
494 reqEvent.set()
495
496 def verify_request(self, request, client_address):
497
498 return True
499
500 def process_request(self, request, client_address, request_data, keep_alive):
501
502 RequestHandler(request, client_address, request_data, keep_alive)
503
504 def close_request(self, request):
505
506 # We have to call shutdown() instead of close() here because close() does not
507 # break the blocking recv() call.
508 print "IN CLOSE_REQUEST!!"
509 request.shutdown(socket.SHUT_RDWR)
510
511 def handle_error(self, request, client_address):
512 """
513 Override handle_error so that errors can be logged.
514 """
515
516 error_str = '-' * 40
517 error_str = error_str + "\nException raised during processing of request from %s\n" % str(client_address)
518 error_str = error_str + traceback.format_exc()
519 error_str = error_str + '-' * 40
520
521 print >> server._errorLog, error_str
522
523 if server.config.server['Debug'] == True:
524 print >> sys.stderr, error_str
525
526 class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
527
528 protocol_version = 'HTTP/1.1'
529 server_version = "Harmonious/%s" % harmonious.__version__
530
531 def __init__(self, request, client_address, request_data, keep_alive):
532 """
533 Override __init__ so that we can accept extra parameters 'request_data'
534 and Keep-Alive data
535 """
536
537 self.request = request
538 self.client_address = client_address
539 self.request_data = request_data
540 self.keep_alive = keep_alive
541
542 try:
543 self.setup()
544 self.handle()
545 self.finish()
546 finally:
547 sys.exc_traceback = None
548
549 def setup(self):
550 """
551 Override setup so we can create rfile and wfile from our already
552 received request data.
553 """
554
555 self.wfile = self.request.makefile('wb', 0)
556 self.rfile = self.request_data
557
558 def __distinguish_host(self, host_header):
559
560 host_header = host_header.lower()
561
562 # We don't care about the port number
563 colon_pos = host_header.find(':')
564
565 if colon_pos >= 0:
566
567 host_header = host_header[:colon_pos]
568
569 if host_header in server.config.hosts.keys():
570
571 return host_header
572
573 return server.config.server['Host']
574
575 def __perform_request(self):
576
577 if self.headers.has_key('host'):
578 harm = self.__create_harm()
579 Dispatcher(harm).dispatch()
580 else:
581 self.send_error(400, "Host header required for %s communication" % self.request_version)
582 self.close_request(self.request)
583
584 def __create_harm(self):
585
586 harm = __import__('harmonious/_harm')
587 host = self.__distinguish_host(self.headers['Host'])
588 (harm.request, harm.session) = RequestFactory(self, host, self.keep_alive).construct()
589 harm.response = Response()
590
591 return harm
592
593 def do_GET(self):
594
595 print self.headers
596 self.__perform_request()
597
598 def do_HEAD(self):
599
600 self.__perform_request()
601
602 def do_POST(self):
603
604 self.__perform_request()
605
606 def version_string(self):
607
608 return self.server_version
609
610 def send_error(self, code, message=None, close=False):
611 """
612 Override send_error so that the 'Connection: close' header is optional
613 """
614
615 try:
616 short, long = self.responses[code]
617 except KeyError:
618 short, long = '???', '???'
619 if message is None:
620 message = short
621 explain = long
622 self.log_error("code %d, message %s", code, message)
623 content = (self.error_message_format %
624 {'code': code, 'message': message, 'explain': explain})
625 self.send_response(code, message)
626 self.send_header("Content-Type", "text/html")
627 self.send_header("Content-Length", len(content))
628 if close:
629 self.send_header('Connection', 'close')
630 self.end_headers()
631 if self.command != 'HEAD' and code >= 200 and code not in (204, 304):
632 self.wfile.write(content)
633
634 def log_message(self, format, *args):
635
636 log_string = "%s - - [%s] %s" % (self.address_string(),
637 self.log_date_time_string(),
638 format%args)
639
640 print >> server._accessLog, log_string
641
642 if server.config.server['Debug'] == True:
643 print >> sys.stderr, log_string
644
645 def close_request(self, request):
646
647 # We have to call shutdown() instead of close() here because close() does not
648 # break the blocking recv() call.
649 print "IN CLOSE_REQUEST"
650 request.shutdown(socket.SHUT_RDWR)
651
652 if __name__ == '__main__':
653
654 print "\n%s Starting up...\n" % __modulename__
655
656 server._startup_time = time.time()
657
658 #print "Reading configuration..."
659
660 server.config = Config(sys.argv[1])
661 server.config._read_config()
662
663 server._errorLog = ErrorLog(server.config.logging['ErrorLog'])
664 server._accessLog = AccessLog(server.config.logging['AccessLog'])
665
666 if server.config.session['SessionEnable'] == True:
667 server._sessionStore = SessionStorage(server.config.session['StorageType'])
668 server._sessionStore.load_store()
669 else:
670 server._sessionStore = None
671
672 map_hosts()
673
674 threadedServer = ThreadedServer((server.config.server['Host'], server.config.server['Port']), RequestHandler)
675 print "serve_forever"
676 threadedServer.serve_forever()
677
678 # try:
679 # pid = os.fork()
680 # if pid > 0:
681 # sys.exit(0)
682 # except OSError, e:
683 # print >> sys.stderr, "Failed to fork daemon process: %d '%s'" % (e.errno, e.strerror)
684 # sys.exit(1)
685
686 # os.chdir("/")
687 # os.setsid()
688 # os.umask(0)
689
690 # try:
691 # pid = os.fork()
692 # if pid > 0:
693 # print "PID: %d" % pid
694 # sys.exit(0)
695 # except OSError, e:
696 # print >> sys.stderr, "Failed to fork daemon process: %d '%s'" % (e.errno, e.strerror)
697 # sys.exit(1)
698
699
700 server._sessionStore.save_store()
701 print "Server Online."
702 print >> server._accessLog, "Server Online"

Properties

Name Value
svn:keywords Id

  ViewVC Help
Powered by ViewVC 1.1.20