[Opensrf-commits] r1703 - trunk/src/python/osrf (erickson)

svn at svn.open-ils.org svn at svn.open-ils.org
Tue May 19 10:12:42 EDT 2009


Author: erickson
Date: 2009-05-19 10:12:40 -0400 (Tue, 19 May 2009)
New Revision: 1703

Modified:
   trunk/src/python/osrf/server.py
   trunk/src/python/osrf/ses.py
   trunk/src/python/osrf/stack.py
Log:
added final bits for stateful sessions (drone keepalive)

Modified: trunk/src/python/osrf/server.py
===================================================================
--- trunk/src/python/osrf/server.py	2009-05-18 13:50:13 UTC (rev 1702)
+++ trunk/src/python/osrf/server.py	2009-05-19 14:12:40 UTC (rev 1703)
@@ -19,7 +19,7 @@
 # -----------------------------------------------------------------------
 
 import os, sys, threading, logging, fcntl, socket, errno, signal, time
-import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
+import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
 
 
 # used to define the size of the PID/size leader in 
@@ -42,6 +42,7 @@
         self.children = [] # list of children
         self.osrf_handle = None # xmpp handle
         self.routers = [] # list of registered routers
+        self.keepalive = 0 # how long to wait for subsequent, stateful requests
 
         # Global status socketpair.  All children relay their 
         # availability info to the parent through this socketpair. 
@@ -296,7 +297,8 @@
                 size = int(self.read_data.recv(SIZE_PAD) or 0)
                 data = self.read_data.recv(size)
                 osrf.log.log_internal("recv'd data " + data)
-                osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+                session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
+                self.keepalive_loop(session)
                 self.num_requests += 1
                 if self.num_requests == self.controller.max_requests:
                     break
@@ -306,6 +308,35 @@
         # run the exit handler
         osrf.app.Application.application.child_exit()
 
+    def keepalive_loop(self, session):
+        keepalive = self.controller.keepalive
+
+        while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
+
+            start = time.time()
+            session.wait(keepalive)
+            end = time.time()
+
+            if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
+                osrf.log.log_internal("client sent disconnect, exiting keepalive")
+                break
+
+            if (end - start) >= keepalive: # exceeded keepalive timeout
+
+                osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
+
+                session.send_status(
+                    session.thread, 
+                    osrf.net_obj.NetworkObject.osrfConnectStatus({   
+                        'status' : 'Disconnected on timeout',
+                        'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
+                    })
+                )
+
+                break
+
+        return
+
     def send_status(self):
         ''' Informs the controller that we are done processing this request '''
         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)

Modified: trunk/src/python/osrf/ses.py
===================================================================
--- trunk/src/python/osrf/ses.py	2009-05-18 13:50:13 UTC (rev 1702)
+++ trunk/src/python/osrf/ses.py	2009-05-19 14:12:40 UTC (rev 1703)
@@ -46,6 +46,7 @@
         self.thread = None
         self.service = None
 
+
     @staticmethod
     def find_or_create(thread):
         if thread in Session.session_cache:
@@ -114,6 +115,7 @@
         # cache this session in the global session cache
         Session.session_cache[self.thread] = self
 
+
     def reset_request_timeout(self, rid):
         req = self.find_request(rid)
         if req:
@@ -334,6 +336,7 @@
     def __init__(self, thread):
         Session.__init__(self)
         self.thread = thread
+        Session.session_cache[thread] = self
 
     def send_status(self, thread_trace, payload):
         self.send(

Modified: trunk/src/python/osrf/stack.py
===================================================================
--- trunk/src/python/osrf/stack.py	2009-05-18 13:50:13 UTC (rev 1702)
+++ trunk/src/python/osrf/stack.py	2009-05-19 14:12:40 UTC (rev 1703)
@@ -36,6 +36,8 @@
     if isinstance(ses, osrf.ses.ServerSession):
         osrf.log.log_info("Message processing duration %f" % duration)
 
+    return ses
+
 def handle_message(session, message):
 
     osrf.log.log_internal("handle_message(): processing message of "
@@ -104,7 +106,7 @@
 
     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
         osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
-        session.state == osrf.const.OSRF_APP_SESSION_CONNECTED 
+        session.state = osrf.const.OSRF_APP_SESSION_CONNECTED 
         session.send_connect_ok(message.threadTrace())
         return
 



More information about the opensrf-commits mailing list