[Opensrf-commits] r2125 - branches/rel_2_0/src/python/osrf (erickson)
svn at svn.open-ils.org
svn at svn.open-ils.org
Mon Dec 13 09:54:10 EST 2010
Author: erickson
Date: 2010-12-13 09:54:08 -0500 (Mon, 13 Dec 2010)
New Revision: 2125
Modified:
branches/rel_2_0/src/python/osrf/server.py
Log:
improved select/read/write fault tolerance; cleaner and more efficient child process idle/active list management; improved logging
Modified: branches/rel_2_0/src/python/osrf/server.py
===================================================================
--- branches/rel_2_0/src/python/osrf/server.py 2010-12-13 14:53:47 UTC (rev 2124)
+++ branches/rel_2_0/src/python/osrf/server.py 2010-12-13 14:54:08 UTC (rev 2125)
@@ -1,5 +1,5 @@
# -----------------------------------------------------------------------
-# Copyright (C) 2008 Equinox Software, Inc.
+# Copyright (C) 2008-2010 Equinox Software, Inc.
# Bill Erickson <erickson at esilibrary.com>
#
# This program is free software; you can redistribute it and/or
@@ -42,10 +42,12 @@
self.keepalive = 0 # how long to wait for subsequent, stateful requests
self.active_list = [] # list of active children
self.idle_list = [] # list of idle children
+ self.pid_map = {} # map of pid -> child object for faster access
# Global status socketpair. All children relay their
# availability info to the parent through this socketpair.
self.read_status, self.write_status = socket.socketpair()
+ self.read_status.setblocking(0)
def load_app(self):
settings = osrf.set.get('activeapps.%s' % self.service)
@@ -54,7 +56,7 @@
def cleanup(self):
''' Closes management sockets, kills children, reaps children, exits '''
- osrf.log.log_info("Shutting down...")
+ osrf.log.log_info("server: shutting down...")
self.cleanup_routers()
self.read_status.shutdown(socket.SHUT_RDWR)
@@ -67,8 +69,8 @@
child.write_data.shutdown(socket.SHUT_RDWR)
child.read_data.close()
child.write_data.close()
+ os.kill(child.pid, signal.SIGKILL)
- os.kill(0, signal.SIGKILL)
self.reap_children(True)
os._exit(0)
@@ -101,90 +103,95 @@
self.register_routers()
try:
- osrf.log.log_debug("entering main server loop...")
+ osrf.log.log_internal("server: entering main server loop...")
+
while True: # main server loop
self.reap_children()
self.check_status()
data = self.osrf_handle.recv(-1).to_xml()
+ child = None
- if self.try_avail_child(data):
- continue
+ if len(self.idle_list) > 0:
+ child = self.idle_list.pop()
+ self.active_list.append(child)
+ osrf.log.log_internal("server: sending data to available child %d" % child.pid)
- if self.try_new_child(data):
- continue
+ elif self.num_children < self.max_children:
+ child = self.spawn_child(True)
+ osrf.log.log_internal("server: sending data to new child %d" % child.pid)
- self.try_wait_child()
+ else:
+ osrf.log.log_warn("server: no children available, waiting...")
+ child = self.check_status(True)
+ self.write_child(child, data)
+
except KeyboardInterrupt:
+ osrf.log.log_info("server: exiting with keyboard interrupt")
+
+ except Exception, e:
+ osrf.log.log_error("server: exiting with exception: %s" % e.message)
+
+ finally:
self.cleanup()
- #except Exception, e:
- #osrf.log.log_error("server exiting with exception: %s" % e.message)
- #self.cleanup()
- def try_avail_child(self, data):
- ''' Trys to send current request data to an available child process '''
+ def write_child(self, child, data):
+ ''' Sends data to the child process '''
- if len(self.idle_list) == 0:
+ try:
+ child.write_data.sendall(data)
+
+ except Exception, e:
+ osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e)))
+ self.cleanup_child(child.pid, True)
return False
- child = self.idle_list.pop(0) # remove from idle list
- osrf.log.log_internal("sending data to available child %d" % child.pid)
- self.write_child(child, data)
- self.active_list.insert(0, child) # add to active list
return True
- def try_new_child(self, data):
- ''' Tries to spawn a new child to send request data to '''
- osrf.log.log_debug("try_new_child: service=%s num_children=%s max_children=%s" % (self.service, self.num_children, self.max_children))
- if self.num_children < self.max_children:
- osrf.log.log_internal("spawning new child to handle data")
- child = self.spawn_child(True)
- self.write_child(child, data)
- return True
- return False
+ def check_status(self, wait=False):
+ ''' Checks to see if any children have indicated they are done with
+ their current request. If wait is true, wait indefinitely
+ for a child to be free. '''
- def try_wait_child(self, data):
- ''' Waits for a child to become available '''
+ ret_child = None
- osrf.log.log_warn("No children available, waiting...")
- child = self.check_status(True)
- self.write_child(child, data)
+ if wait:
+ self.read_status.setblocking(1)
+ while True:
+ pid = None
- def write_child(self, child, data):
- ''' Sends data to the child process '''
- # Do we need to watch for sigpipe, etc?
- child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
-
-
- def check_status(self, block=False):
- ''' Checks to see if any children have indicated they are done with
- their current request. If block is true, this will wait
- indefinitely for a child to be free. '''
-
- pid = None
- if block:
- pid = self.read_status.recv(SIZE_PAD)
- else:
try:
- self.read_status.setblocking(0)
pid = self.read_status.recv(SIZE_PAD)
+
except socket.error, e:
- if e.args[0] != errno.EAGAIN:
- raise e
- self.read_status.setblocking(1)
-
- if pid:
- pid = int(pid)
- child = [c for c in self.active_list if c.pid == pid][0]
- self.active_list.remove(child)
- self.idle_list.insert(0, child)
- return child
+ if e.args[0] == errno.EAGAIN:
+ break # no data left to read in nonblocking mode
- return None
+ osrf.log.log_error("server: child status check failed: %s" % str(e))
+ if not wait or ret_child:
+ break
+
+ finally:
+ if wait and ret_child:
+ # we've received a status update from at least
+ # 1 child. No need to block anymore.
+ self.read_status.setblocking(0)
+
+ if pid:
+ child = self.pid_map[int(pid)]
+ osrf.log.log_internal("server: child process %d reporting for duty" % child.pid)
+ if wait and ret_child is None:
+ # caller is waiting for a free child, leave it in the active list
+ ret_child = child
+ else:
+ self.active_list.remove(child)
+ self.idle_list.append(child)
+
+ return ret_child
def reap_children(self, done=False):
@@ -202,26 +209,36 @@
self.spawn_children()
return
- osrf.log.log_debug("reaping child %d" % pid)
+ osrf.log.log_internal("server: cleaning up child %d" % pid)
self.num_children -= 1
+ self.cleanup_child(pid)
- # locate the child in the active or idle list and remove it
- # Note: typically, a dead child will be in the active list, since
- # exiting children do not send a cleanup status to the controller
+ except OSError:
+ return
- child = [c for c in self.active_list if c.pid == pid]
- if len(child) > 0:
- self.active_list.remove(child[0])
- else:
- child = [c for c in self.idle_list if c.pid == pid]
- self.idle_list.remove(child[0])
+ def cleanup_child(self, pid, kill=False):
- except OSError:
- return
+ if kill:
+ os.kill(pid, signal.SIGKILL)
+
+ # locate the child in the active or idle list and remove it
+ # Note: typically, a dead child will be in the active list, since
+ # exiting children do not send a cleanup status to the controller
+
+ try:
+ self.active_list.pop(self.active_list.index(self.pid_map[pid]))
+ except:
+ try:
+ self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
+ except:
+ pass
+
+ del self.pid_map[pid]
+
+
def spawn_children(self):
''' Launches up to min_children child processes '''
- osrf.log.log_debug("spawn_children: service=%s num_children=%s min_children=%s" % (self.service, self.num_children, self.min_children))
while self.num_children < self.min_children:
self.spawn_child()
@@ -232,20 +249,21 @@
child.read_data, child.write_data = socket.socketpair()
child.pid = os.fork()
- if child.pid:
+ if child.pid: # parent process
self.num_children += 1
+ self.pid_map[child.pid] = child
if active:
- self.active_list.insert(0, child)
+ self.active_list.append(child)
else:
- self.idle_list.insert(0, child)
- osrf.log.log_debug("service %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
+ self.idle_list.append(child)
+ osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
return child
else:
child.pid = os.getpid()
child.init()
child.run()
osrf.net.get_network_handle().disconnect()
- osrf.log.log_internal("child exiting...")
+ osrf.log.log_internal("server: child exiting...")
os._exit(0)
def register_routers(self):
@@ -270,7 +288,7 @@
def register_router(self, target):
''' Registers with a single router '''
- osrf.log.log_info("registering with router %s" % target)
+ osrf.log.log_info("server: registering with router %s" % target)
self.routers.append(target)
reg_msg = osrf.net.NetworkMessage(
@@ -286,7 +304,7 @@
''' Un-registers with all connected routers '''
for target in self.routers:
- osrf.log.log_info("un-registering with router %s" % target)
+ osrf.log.log_info("server: un-registering with router %s" % target)
unreg_msg = osrf.net.NetworkMessage(
recipient = target,
body = 'un-registering...',
@@ -310,17 +328,46 @@
''' Loops, processing data, until max_requests is reached '''
while True:
+
try:
- size = int(self.read_data.recv(SIZE_PAD) or 0)
- data = self.read_data.recv(size)
- osrf.log.log_internal("recv'd data " + data)
+
+ self.read_data.setblocking(1)
+ data = ''
+
+ while True: # read all the data from the socket
+
+ buf = None
+ try:
+ buf = self.read_data.recv(2048)
+ except socket.error, e:
+ if e.args[0] == errno.EAGAIN:
+ break
+ osrf.log.log_error("server: child data read failed: %s" % str(e))
+ osrf.app.Application.application.child_exit()
+ return
+
+ if buf is None or buf == '':
+ break
+
+ data += buf
+ self.read_data.setblocking(0)
+
+ osrf.log.log_internal("server: child received message: " + data)
+
osrf.net.get_network_handle().flush_inbound_data()
session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
self.keepalive_loop(session)
+
self.num_requests += 1
+
+ osrf.log.log_internal("server: child done processing message")
+
if self.num_requests == self.controller.max_requests:
break
+
+ # tell the parent we're done w/ this request session
self.send_status()
+
except KeyboardInterrupt:
pass
@@ -335,12 +382,14 @@
status = session.wait(keepalive)
if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
- osrf.log.log_internal("client sent disconnect, exiting keepalive")
+ osrf.log.log_internal("server: client sent disconnect, exiting keepalive")
break
if status is None: # no msg received before keepalive timeout expired
- osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
+ osrf.log.log_info(
+ "server: no request was received in %d seconds from %s, exiting stateful session" % (
+ session.remote_id, int(keepalive)));
session.send_status(
session.thread,
More information about the opensrf-commits
mailing list