[Opensrf-commits] r2124 - trunk/src/python/osrf (erickson)

svn at svn.open-ils.org svn at svn.open-ils.org
Mon Dec 13 09:53:49 EST 2010


Author: erickson
Date: 2010-12-13 09:53:47 -0500 (Mon, 13 Dec 2010)
New Revision: 2124

Modified:
   trunk/src/python/osrf/server.py
Log:
improved select/read/write fault tolerance; cleaner and more efficient child process idle/active list management; improved logging

Modified: trunk/src/python/osrf/server.py
===================================================================
--- trunk/src/python/osrf/server.py	2010-12-07 14:30:45 UTC (rev 2123)
+++ trunk/src/python/osrf/server.py	2010-12-13 14:53:47 UTC (rev 2124)
@@ -1,5 +1,5 @@
 # -----------------------------------------------------------------------
-# Copyright (C) 2008  Equinox Software, Inc.
+# Copyright (C) 2008-2010  Equinox Software, Inc.
 # Bill Erickson <erickson at esilibrary.com>
 #
 # This program is free software; you can redistribute it and/or
@@ -42,10 +42,12 @@
         self.keepalive = 0 # how long to wait for subsequent, stateful requests
         self.active_list = [] # list of active children
         self.idle_list = [] # list of idle children
+        self.pid_map = {} # map of pid -> child object for faster access
 
         # Global status socketpair.  All children relay their 
         # availability info to the parent through this socketpair. 
         self.read_status, self.write_status = socket.socketpair()
+        self.read_status.setblocking(0)
 
     def load_app(self):
         settings = osrf.set.get('activeapps.%s' % self.service)
@@ -54,7 +56,7 @@
     def cleanup(self):
         ''' Closes management sockets, kills children, reaps children, exits '''
 
-        osrf.log.log_info("Shutting down...")
+        osrf.log.log_info("server: shutting down...")
         self.cleanup_routers()
 
         self.read_status.shutdown(socket.SHUT_RDWR)
@@ -67,8 +69,8 @@
             child.write_data.shutdown(socket.SHUT_RDWR)
             child.read_data.close()
             child.write_data.close()
+            os.kill(child.pid, signal.SIGKILL)
 
-        os.kill(0, signal.SIGKILL)
         self.reap_children(True)
         os._exit(0)
 
@@ -101,90 +103,95 @@
         self.register_routers()
 
         try:
-            osrf.log.log_debug("entering main server loop...")
+            osrf.log.log_internal("server: entering main server loop...")
+
             while True: # main server loop
 
                 self.reap_children()
                 self.check_status()
                 data = self.osrf_handle.recv(-1).to_xml()
+                child = None
 
-                if self.try_avail_child(data):
-                    continue
+                if len(self.idle_list) > 0:
+                    child = self.idle_list.pop() 
+                    self.active_list.append(child)
+                    osrf.log.log_internal("server: sending data to available child %d" % child.pid)
 
-                if self.try_new_child(data):
-                    continue
+                elif self.num_children < self.max_children:
+                    child = self.spawn_child(True)
+                    osrf.log.log_internal("server: sending data to new child %d" % child.pid)
 
-                self.try_wait_child()
+                else:
+                    osrf.log.log_warn("server: no children available, waiting...")
+                    child = self.check_status(True)
 
+                self.write_child(child, data)
+
         except KeyboardInterrupt:
+            osrf.log.log_info("server: exiting with keyboard interrupt")
+
+        except Exception, e: 
+            osrf.log.log_error("server: exiting with exception: %s" % e.message)
+
+        finally:
             self.cleanup()
-        #except Exception, e: 
-            #osrf.log.log_error("server exiting with exception: %s" % e.message)
-            #self.cleanup()
                 
 
-    def try_avail_child(self, data):
-        ''' Trys to send current request data to an available child process '''
+    def write_child(self, child, data):
+        ''' Sends data to the child process '''
 
-        if len(self.idle_list) == 0:
+        try:
+            child.write_data.sendall(data)
+
+        except Exception, e:
+            osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e)))
+            self.cleanup_child(child.pid, True)
             return False
 
-        child = self.idle_list.pop(0) # remove from idle list
-        osrf.log.log_internal("sending data to available child %d" % child.pid)
-        self.write_child(child, data)
-        self.active_list.insert(0, child) # add to active list
         return True
 
-    def try_new_child(self, data):
-        ''' Tries to spawn a new child to send request data to '''
 
-        osrf.log.log_debug("try_new_child: service=%s num_children=%s max_children=%s" % (self.service, self.num_children, self.max_children))
-        if self.num_children < self.max_children:
-            osrf.log.log_internal("spawning new child to handle data")
-            child = self.spawn_child(True)
-            self.write_child(child, data)
-            return True
-        return False
+    def check_status(self, wait=False):
+        ''' Checks to see if any children have indicated they are done with 
+            their current request.  If wait is true, wait indefinitely 
+            for a child to be free. '''
 
-    def try_wait_child(self, data):
-        ''' Waits for a child to become available '''
+        ret_child = None
 
-        osrf.log.log_warn("No children available, waiting...")
-        child = self.check_status(True)
-        self.write_child(child, data)
+        if wait:
+            self.read_status.setblocking(1)
 
+        while True:
+            pid = None
 
-    def write_child(self, child, data):
-        ''' Sends data to the child process '''
-        # Do we need to watch for sigpipe, etc?
-        child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
-
-
-    def check_status(self, block=False):
-        ''' Checks to see if any children have indicated they are done with 
-            their current request.  If block is true, this will wait 
-            indefinitely for a child to be free. '''
-
-        pid = None
-        if block:
-            pid = self.read_status.recv(SIZE_PAD)
-        else:
             try:
-                self.read_status.setblocking(0)
                 pid = self.read_status.recv(SIZE_PAD)
+
             except socket.error, e:
-                if e.args[0] != errno.EAGAIN:
-                    raise e
-            self.read_status.setblocking(1)
-                
-        if pid:
-            pid = int(pid)
-            child = [c for c in self.active_list if c.pid == pid][0]
-            self.active_list.remove(child)
-            self.idle_list.insert(0, child)
-            return child
+                if e.args[0] == errno.EAGAIN:
+                    break # no data left to read in nonblocking mode
 
-        return None
+                osrf.log.log_error("server: child status check failed: %s" % str(e))
+                if not wait or ret_child:
+                    break
+
+            finally:
+                if wait and ret_child:
+                    # we've received a status update from at least 
+                    # 1 child.  No need to block anymore.
+                    self.read_status.setblocking(0)
+
+            if pid:
+                child = self.pid_map[int(pid)]
+                osrf.log.log_internal("server: child process %d reporting for duty" % child.pid)
+                if wait and ret_child is None:
+                    # caller is waiting for a free child, leave it in the active list
+                    ret_child = child
+                else:
+                    self.active_list.remove(child)
+                    self.idle_list.append(child)
+
+        return ret_child
         
 
     def reap_children(self, done=False):
@@ -202,26 +209,36 @@
                         self.spawn_children()
                     return
 
-                osrf.log.log_debug("reaping child %d" % pid)
+                osrf.log.log_internal("server: cleaning up child %d" % pid)
                 self.num_children -= 1
+                self.cleanup_child(pid)
 
-                # locate the child in the active or idle list and remove it
-                # Note: typically, a dead child will be in the active list, since 
-                # exiting children do not send a cleanup status to the controller
+            except OSError:
+                return
 
-                child = [c for c in self.active_list if c.pid == pid]
-                if len(child) > 0:
-                    self.active_list.remove(child[0])
-                else:
-                    child = [c for c in self.idle_list if c.pid == pid]
-                    self.idle_list.remove(child[0])
+    def cleanup_child(self, pid, kill=False):
 
-            except OSError:
-                return
+        if kill:
+            os.kill(pid, signal.SIGKILL)
+
+        # locate the child in the active or idle list and remove it
+        # Note: typically, a dead child will be in the active list, since 
+        # exiting children do not send a cleanup status to the controller
+
+        try:
+            self.active_list.pop(self.active_list.index(self.pid_map[pid]))
+        except:
+            try:
+                self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
+            except:
+                pass
+
+        del self.pid_map[pid]
+
+            
         
     def spawn_children(self):
         ''' Launches up to min_children child processes '''
-        osrf.log.log_debug("spawn_children: service=%s num_children=%s min_children=%s" % (self.service, self.num_children, self.min_children))
         while self.num_children < self.min_children:
             self.spawn_child()
 
@@ -232,20 +249,21 @@
         child.read_data, child.write_data = socket.socketpair()
         child.pid = os.fork()
 
-        if child.pid:
+        if child.pid: # parent process
             self.num_children += 1
+            self.pid_map[child.pid] = child
             if active:
-                self.active_list.insert(0, child)
+                self.active_list.append(child)
             else:
-                self.idle_list.insert(0, child)
-            osrf.log.log_debug("service %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
+                self.idle_list.append(child)
+            osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
             return child
         else:
             child.pid = os.getpid()
             child.init()
             child.run()
             osrf.net.get_network_handle().disconnect()
-            osrf.log.log_internal("child exiting...")
+            osrf.log.log_internal("server: child exiting...")
             os._exit(0)
 
     def register_routers(self):
@@ -270,7 +288,7 @@
     def register_router(self, target):
         ''' Registers with a single router '''
 
-        osrf.log.log_info("registering with router %s" % target)
+        osrf.log.log_info("server: registering with router %s" % target)
         self.routers.append(target)
 
         reg_msg = osrf.net.NetworkMessage(
@@ -286,7 +304,7 @@
         ''' Un-registers with all connected routers '''
 
         for target in self.routers:
-            osrf.log.log_info("un-registering with router %s" % target)
+            osrf.log.log_info("server: un-registering with router %s" % target)
             unreg_msg = osrf.net.NetworkMessage(
                 recipient = target,
                 body = 'un-registering...',
@@ -310,17 +328,46 @@
         ''' Loops, processing data, until max_requests is reached '''
 
         while True:
+
             try:
-                size = int(self.read_data.recv(SIZE_PAD) or 0)
-                data = self.read_data.recv(size)
-                osrf.log.log_internal("recv'd data " + data)
+
+                self.read_data.setblocking(1)
+                data = ''
+
+                while True: # read all the data from the socket
+
+                    buf = None
+                    try:
+                        buf = self.read_data.recv(2048)
+                    except socket.error, e:
+                        if e.args[0] == errno.EAGAIN:
+                            break
+                        osrf.log.log_error("server: child data read failed: %s" % str(e))
+                        osrf.app.Application.application.child_exit()
+                        return
+
+                    if buf is None or buf == '':
+                        break
+
+                    data += buf
+                    self.read_data.setblocking(0)
+
+                osrf.log.log_internal("server: child received message: " + data)
+
                 osrf.net.get_network_handle().flush_inbound_data()
                 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
                 self.keepalive_loop(session)
+
                 self.num_requests += 1
+
+                osrf.log.log_internal("server: child done processing message")
+
                 if self.num_requests == self.controller.max_requests:
                     break
+
+                # tell the parent we're done w/ this request session
                 self.send_status()
+
             except KeyboardInterrupt:
                 pass
 
@@ -335,12 +382,14 @@
             status = session.wait(keepalive)
 
             if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
-                osrf.log.log_internal("client sent disconnect, exiting keepalive")
+                osrf.log.log_internal("server: client sent disconnect, exiting keepalive")
                 break
 
             if status is None: # no msg received before keepalive timeout expired
 
-                osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
+                osrf.log.log_info(
+                    "server: no request was received in %d seconds from %s, exiting stateful session" % (
+                    session.remote_id, int(keepalive)));
 
                 session.send_status(
                     session.thread, 



More information about the opensrf-commits mailing list