[Opensrf-commits] r1969 - in trunk: include/opensrf src/libopensrf (scottmk)

svn at svn.open-ils.org svn at svn.open-ils.org
Thu Jun 24 12:56:49 EDT 2010


Author: scottmk
Date: 2010-06-24 12:56:45 -0400 (Thu, 24 Jun 2010)
New Revision: 1969

Modified:
   trunk/include/opensrf/osrf_app_session.h
   trunk/src/libopensrf/osrf_app_session.c
   trunk/src/libopensrf/osrf_prefork.c
Log:
Provide a mechanism whereby a server drone can terminate immediately,
without waiting for max_requests or a DISCONNECT message.

Motivation: a drone may determine that it is incapacitated and can neither
complete the current request nor service subsequent ones.

In particular, it may lose a database connection.

Mechanism: set a switch in the current osrfAppSession.  In osrf_prefork.c,
inspect this switch after every method call, and bail out if it's set.

M    include/opensrf/osrf_app_session.h
M    src/libopensrf/osrf_app_session.c
M    src/libopensrf/osrf_prefork.c


Modified: trunk/include/opensrf/osrf_app_session.h
===================================================================
--- trunk/include/opensrf/osrf_app_session.h	2010-06-22 15:35:03 UTC (rev 1968)
+++ trunk/include/opensrf/osrf_app_session.h	2010-06-24 16:56:45 UTC (rev 1969)
@@ -84,6 +84,10 @@
 
 	/** Hash table of pending requests. */
 	osrfAppRequest* request_hash[ OSRF_REQUEST_HASH_SIZE ];
+
+	/** Boolean: true if the app wants to terminate the process.  Typically this means that */
+	/** a drone has lost its database connection and can therefore no longer function.      */
+	int panic;
 };
 typedef struct osrf_app_session_struct osrfAppSession;
 
@@ -146,6 +150,8 @@
 
 void osrfAppSessionCleanup( void );
 
+void osrfAppSessionPanic( osrfAppSession* ses );
+
 #ifdef __cplusplus
 }
 #endif

Modified: trunk/src/libopensrf/osrf_app_session.c
===================================================================
--- trunk/src/libopensrf/osrf_app_session.c	2010-06-22 15:35:03 UTC (rev 1968)
+++ trunk/src/libopensrf/osrf_app_session.c	2010-06-24 16:56:45 UTC (rev 1969)
@@ -240,9 +240,13 @@
 	becomes available before the end of the timeout; otherwise NULL;
 
 	If there is already a message available in the input queue for this request, dequeue and
-	return it 	immediately.  Otherwise wait up to timeout seconds until you either get an
+	return it immediately.  Otherwise wait up to timeout seconds until you either get an
 	input message for the specified request, run out of time, or encounter an error.
 
+	If the only message we receive for this request is a STATUS message with a status code
+	OSRF_STATUS_COMPLETE, then return NULL.  That means that the server has nothing further
+	to send in response to this request.
+
 	You may also receive other messages for other requests, and other sessions.  These other
 	messages will be wholly or partially processed behind the scenes while you wait for the
 	one you want.
@@ -271,7 +275,7 @@
 		osrfLogDebug( OSRF_LOG_MARK,  "In app_request receive with remaining time [%d]",
 				(int) remaining );
 
-		
+
 		osrf_app_session_queue_wait( req->session, 0, NULL );
 		if(req->session->transport_error) {
 			osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
@@ -474,6 +478,7 @@
 	session->remote_service = strdup(remote_service);
 	session->session_locale = NULL;
 	session->transport_error = 0;
+	session->panic = 0;
 
 	#ifdef ASSUME_STATELESS
 	session->stateless = 1;
@@ -813,6 +818,8 @@
 
 	/* defaulting to protocol 1 for now */
 	osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
+
+	// Address this message to the router
 	osrf_app_session_reset_remote( session );
 	session->state = OSRF_SESSION_CONNECTING;
 	int ret = _osrf_app_session_send( session, con_msg );
@@ -999,12 +1006,12 @@
 	message; process subsequent messages if they are available, but don't wait for them.
 
 	The first parameter identifies an osrfApp session, but all we really use it for is to
-	get a pointer to the transport_session.  Typically, if not always, a given process
-	opens only a single transport_session (to talk to the Jabber server), and all app
-	sessions in that process use the same transport_session.
+	get a pointer to the transport_session.  Typically, a given process opens only a single
+	transport_session (to talk to the Jabber server), and all app sessions in that process
+	use the same transport_session.
 
-	Hence this function indiscriminately waits for input messages for all osrfAppSessions,
-	not just the one specified.
+	Hence this function indiscriminately waits for input messages for all osrfAppSessions
+	tied to the same Jabber session, not just the one specified.
 
 	Dispatch each message to the appropriate processing routine, depending on its type
 	and contents, and on whether we're acting as a client or as a server for that message.
@@ -1059,7 +1066,7 @@
 	free(session->orig_remote_id);
 	free(session->session_id);
 	free(session->remote_service);
-	
+
 	// Free the request hash
 	int i;
 	for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i ) {
@@ -1192,7 +1199,7 @@
 
 /**
 	@brief Free the global session cache.
-	
+
 	Note that the osrfHash that implements the global session cache does @em not have a
 	callback function installed for freeing its cargo.  As a result, any remaining
 	osrfAppSessions are leaked, along with all the osrfAppRequests and osrfMessages they
@@ -1203,7 +1210,14 @@
 	osrfAppSessionCache = NULL;
 }
 
+/**
+	@brief Arrange for immediate termination of the process.
+	@param ses Pointer to the current osrfAppSession.
 
-
-
-
+	Typical use case: a server drone loses its database connection, thereby becoming useless.
+	It terminates so that it will not receive further requests, being unable to service them.
+*/
+void osrfAppSessionPanic( osrfAppSession* ses ) {
+	if( ses )
+		ses->panic = 1;
+}

Modified: trunk/src/libopensrf/osrf_prefork.c
===================================================================
--- trunk/src/libopensrf/osrf_prefork.c	2010-06-22 15:35:03 UTC (rev 1968)
+++ trunk/src/libopensrf/osrf_prefork.c	2010-06-24 16:56:45 UTC (rev 1969)
@@ -93,7 +93,7 @@
 
 static void del_prefork_child( prefork_simple* forker, pid_t pid );
 static void check_children( prefork_simple* forker, int forever );
-static void prefork_child_process_request(prefork_child*, char* data);
+static int  prefork_child_process_request(prefork_child*, char* data);
 static int prefork_child_init_hook(prefork_child*);
 static prefork_child* prefork_child_init( prefork_simple* forker,
 		int read_data_fd, int write_data_fd,
@@ -339,8 +339,10 @@
 }
 
 // Called only by a child process
-static void prefork_child_process_request(prefork_child* child, char* data) {
-	if( !child ) return;
+// Non-zero return code means that the child process has decided to terminate immediately,
+// without waiting for a DISCONNECT or max_requests.
+static int prefork_child_process_request(prefork_child* child, char* data) {
+	if( !child ) return 0;
 
 	transport_client* client = osrfSystemGetTransportClient();
 
@@ -359,11 +361,20 @@
 	transport_message* msg = new_message_from_xml( data );
 
 	osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
-	if(!session) return;
+	if(!session) return 0;
 
+	int rc = session->panic;
+
+	if( rc ) {
+		osrfLogWarning( OSRF_LOG_MARK,
+			"Drone for session %s terminating immediately", session->session_id );
+		osrfAppSessionFree( session );
+		return rc;
+	}
+
 	if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
 		osrfAppSessionFree( session );
-		return;
+		return rc;
 	}
 
 	osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
@@ -383,6 +394,9 @@
 
 		osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
 
+		if( session->panic )
+			rc = 1;
+
 		if(retval) {
 			osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
 			break;
@@ -404,11 +418,15 @@
 
 			break;
 		}
+
+		// If the child process has decided to terminate immediately
+		if( rc )
+			break;
 	}
 
 	osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
 	osrfAppSessionFree( session );
-	return;
+	return rc;
 }
 
 /**
@@ -909,17 +927,24 @@
 			break;
 		}
 
+		int terminate_now = 0;     // Boolean
+
 		if( n < 0 ) {
 			osrfLogWarning( OSRF_LOG_MARK,
 					"Prefork child read returned error with errno %d", errno );
 			break;
 
 		} else if( gotdata ) {
-			osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
-			prefork_child_process_request(child, gbuf->buf);
+			osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
+			terminate_now = prefork_child_process_request( child, gbuf->buf );
 			buffer_reset( gbuf );
 		}
 
+		if( terminate_now ) {
+			osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
+			break;
+		}
+
 		if( i < child->max_requests - 1 ) {
 			size_t msg_len = 9;
 			ssize_t len = write(



More information about the opensrf-commits mailing list