[Opensrf-commits] r1881 - in trunk: include/opensrf src/libopensrf (scottmk)

svn at svn.open-ils.org svn at svn.open-ils.org
Fri Jan 1 09:49:18 EST 2010


Author: scottmk
Date: 2010-01-01 09:49:16 -0500 (Fri, 01 Jan 2010)
New Revision: 1881

Modified:
   trunk/include/opensrf/osrf_app_session.h
   trunk/src/libopensrf/osrf_app_session.c
Log:
Changed the way pending requests are stored in an osrfAppSession.

Before, pending requests were stored in a so-called request_queue.
However it wasn't a queue at all, except in name.  It was an
osrfList, i.e. an expandable pointer array used as a random
access container.  Request ids were used as subscripts into the
array.

Since we don't reuse request ids (except in the theoretical case
of a wraparound), the array grew without limit.  This unbounded
growth in the memory footprint could create problems for a
long-running busy process.  It might have contributed to the
rumored instabililty of chopchop, our homegrown Jabber server.

Now, pending requests are stored in a hash table, where each
of 64 slots holds a doubly linked list.  There should be no
effect on performance unless there are hundreds or thousands
of pending requests at once, in which case we would probably
get bogged down anyway.

M    include/opensrf/osrf_app_session.h
M    src/libopensrf/osrf_app_session.c


Modified: trunk/include/opensrf/osrf_app_session.h
===================================================================
--- trunk/include/opensrf/osrf_app_session.h	2009-12-29 13:22:49 UTC (rev 1880)
+++ trunk/include/opensrf/osrf_app_session.h	2010-01-01 14:49:16 UTC (rev 1881)
@@ -33,6 +33,11 @@
 	OSRF_SESSION_CLIENT
 };
 
+struct osrf_app_request_struct;
+typedef struct osrf_app_request_struct osrfAppRequest;
+
+#define OSRF_REQUEST_HASH_SIZE 64
+
 /**
 	@brief Representation of a session with another application.
 
@@ -44,8 +49,6 @@
 
 	/** Our messag passing object */
 	transport_client* transport_handle;
-	/** Cache of active app_request objects */
-	osrfList* request_queue;
 
 	/** The original remote id of the remote service we're talking to */
 	char* orig_remote_id;
@@ -61,7 +64,7 @@
 	/** Our ID */
 	char* session_id;
 
-	/* true if this session does not require connect messages */
+	/** true if this session does not require connect messages */
 	int stateless;
 
 	/** The connect state */
@@ -73,12 +76,15 @@
 	/** the current locale for this session **/
 	char* session_locale;
 
-	/* let the user use the session to store their own session data */
+	/** let the user use the session to store their own session data */
 	void* userData;
 
 	void (*userDataFree) (void*);
 
     int transport_error;
+
+	/** Hash table of pending requests. */
+	osrfAppRequest* request_hash[ OSRF_REQUEST_HASH_SIZE ];
 };
 typedef struct osrf_app_session_struct osrfAppSession;
 
@@ -114,7 +120,7 @@
 /** Builds a new app_request object with the given payload and returns
 	the id of the request.  This id is then used to perform work on the
 	request.
- */
+*/
 int osrfAppSessionSendRequest(
 		 osrfAppSession* session, const jsonObject* params,
 		 const char* method_name, int protocol );

Modified: trunk/src/libopensrf/osrf_app_session.c
===================================================================
--- trunk/src/libopensrf/osrf_app_session.c	2009-12-29 13:22:49 UTC (rev 1880)
+++ trunk/src/libopensrf/osrf_app_session.c	2010-01-01 14:49:16 UTC (rev 1881)
@@ -25,9 +25,16 @@
 	/** Boolean; if true, then a call that is waiting on a response will reset the
 	timeout and set this variable back to false. */
 	int reset_timeout;
+	/** Linkage pointers for a linked list.  We maintain a hash table of pending requests,
+	    and each slot of the hash table is a doubly linked list. */
+	osrfAppRequest* next;
+	osrfAppRequest* prev;
 };
-typedef struct osrf_app_request_struct osrfAppRequest;
 
+static inline unsigned int request_id_hash( int req_id );
+static osrfAppRequest* find_app_request( const osrfAppSession* session, int req_id );
+static void add_app_request( osrfAppSession* session, osrfAppRequest* req );
+
 /* Send the given message */
 static int _osrf_app_session_send( osrfAppSession*, osrfMessage* msg );
 
@@ -52,14 +59,12 @@
 	@return Pointer to the new osrfAppRequest.
 
 	The calling code is responsible for freeing the osrfAppRequest by calling
-	_osrf_app_request_free().  In practice this normally happens via a callback installed
-	in an osrfList.
+	_osrf_app_request_free().
 */
 static osrfAppRequest* _osrf_app_request_init(
 		osrfAppSession* session, osrfMessage* msg ) {
 
-	osrfAppRequest* req =
-		(osrfAppRequest*) safe_malloc(sizeof(osrfAppRequest));
+	osrfAppRequest* req = safe_malloc(sizeof(osrfAppRequest));
 
 	req->session        = session;
 	req->request_id     = msg->thread_trace;
@@ -67,6 +72,8 @@
 	req->payload        = msg;
 	req->result         = NULL;
 	req->reset_timeout  = 0;
+	req->next           = NULL;
+	req->prev           = NULL;
 
 	return req;
 }
@@ -75,25 +82,21 @@
 /**
 	@brief Free an osrfAppRequest and everything it owns.
 	@param req Pointer to an osrfAppRequest, cast to a void pointer.
-
-	This function is installed as a callback in the osrfList request_queue, a member
-	of osrfAppSession.
 */
-static void _osrf_app_request_free( void * req ){
+static void _osrf_app_request_free( osrfAppRequest * req ) {
 	if( req ) {
-		osrfAppRequest* r = (osrfAppRequest*) req;
-		if( r->payload )
-			osrfMessageFree( r->payload );
+		if( req->payload )
+			osrfMessageFree( req->payload );
 
 		/* Free the messages in the result queue */
 		osrfMessage* next_msg;
-		while( r->result ) {
-			next_msg = r->result->next;
-			osrfMessageFree( r->result );
-			r->result = next_msg;
+		while( req->result ) {
+			next_msg = req->result->next;
+			osrfMessageFree( req->result );
+			req->result = next_msg;
 		}
 
-		free( r );
+		free( req );
 	}
 }
 
@@ -130,21 +133,85 @@
 	@brief Remove an osrfAppRequest (identified by request_id) from an osrfAppSession.
 	@param session Pointer to the osrfAppSession that owns the osrfAppRequest.
 	@param req_id request_id of the osrfAppRequest to be removed.
-
-	The osrfAppRequest itself is freed by a callback installed in the osrfList where it resides.
 */
-void osrf_app_session_request_finish(
-		osrfAppSession* session, int req_id ){
+void osrf_app_session_request_finish( osrfAppSession* session, int req_id ) {
 
 	if( session ) {
-		osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
-		if(req)
-			osrfListRemove( req->session->request_queue, req->request_id );
+		// Search the hash table for the request in question
+		unsigned int index = request_id_hash( req_id );
+		osrfAppRequest* old_req = session->request_hash[ index ];
+		while( old_req ) {
+			if( old_req->request_id == req_id )
+				break;
+			else
+				old_req = old_req->next;
+		}
+
+		if( old_req ) {
+			// Remove the request from the doubly linked list
+			if( old_req->prev )
+				old_req->prev->next = old_req->next;
+			else
+				session->request_hash[ index ] = old_req->next;
+
+			if( old_req->next )
+				old_req->next->prev = old_req->prev;
+
+			_osrf_app_request_free( old_req );
+		}
 	}
 }
 
+/**
+	@brief Derive a hash key from a request id.
+	@param req_id The request id.
+	@return The corresponding hash key; an index into request_hash[].
 
+	If OSRF_REQUEST_HASH_SIZE is a power of two, then this calculation should
+	reduce to a binary AND.
+*/
+static inline unsigned int request_id_hash( int req_id ) {
+	return ((unsigned int) req_id ) % OSRF_REQUEST_HASH_SIZE;
+}
+
 /**
+	@brief Search for an osrfAppRequest in the hash table, given a request id.
+	@param session Pointer to the relevant osrfAppSession.
+	@param req_id The request_id of the osrfAppRequest being sought.
+	@return A pointer to the osrfAppRequest if found, or NULL if not.
+*/
+static osrfAppRequest* find_app_request( const osrfAppSession* session, int req_id ) {
+
+	osrfAppRequest* req = session->request_hash[ request_id_hash( req_id) ];
+	while( req ) {
+		if( req->request_id == req_id )
+			break;
+		else
+			req = req->next;
+	}
+
+	return req;
+}
+
+/**
+	@brief Add an osrfAppRequest to the hash table of a given osrfAppSession.
+	@param session Pointer to the session to which the request belongs.
+	@param req Pointer to the osrfAppRequest to be stored.
+
+	Find the right spot in the hash table; then add the request to the linked list at that
+	spot.  We just add it to the head of the list, without trying to maintain any particular
+	ordering.
+*/
+static void add_app_request( osrfAppSession* session, osrfAppRequest* req ) {
+	if( session && req ) {
+		unsigned int index = request_id_hash( req->request_id );
+		req->next = session->request_hash[ index ];
+		req->prev = NULL;
+		session->request_hash[ index ] = req;
+	}
+}
+
+/**
 	@brief Set the timeout for a request to one second.
 	@param session Pointer to the relevant osrfAppSession.
 	@param req_id Request ID of the request whose timeout is to be reset.
@@ -155,7 +222,7 @@
 	if(session == NULL)
 		return;
 	osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
+	osrfAppRequest* req = find_app_request( session, req_id );
 	if( req )
 		req->reset_timeout = 1;
 }
@@ -371,8 +438,6 @@
 		return NULL;
 	}
 
-	session->request_queue = osrfNewList();
-	session->request_queue->freeItem = &_osrf_app_request_free;
 	session->remote_id = strdup(target_buf);
 	session->orig_remote_id = strdup(session->remote_id);
 	session->remote_service = strdup(remote_service);
@@ -402,6 +467,12 @@
 	session->userData = NULL;
 	session->userDataFree = NULL;
 
+	// Initialize the hash table
+	int i;
+
+	for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i )
+		session->request_hash[ i ] = NULL;
+
 	_osrf_app_session_push_session( session );
 	return session;
 }
@@ -429,9 +500,6 @@
 	if(statel) stateless = atoi(statel);
 	free(statel);
 
-
-	session->request_queue = osrfNewList();
-	session->request_queue->freeItem = &_osrf_app_request_free;
 	session->remote_id = strdup(remote_id);
 	session->orig_remote_id = strdup(remote_id);
 	session->session_id = strdup(session_id);
@@ -452,9 +520,14 @@
 	session->userData = NULL;
 	session->userDataFree = NULL;
 
+	// Initialize the hash table
+	int i;
+
+	for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i )
+		session->request_hash[ i ] = NULL;
+
 	_osrf_app_session_push_session( session );
 	return session;
-
 }
 
 /**
@@ -488,7 +561,7 @@
 		osrfAppSession* session, const jsonObject* params,
 		const char* method_name, int protocol, osrfStringArray* param_strings ) {
 
-	osrfLogWarning( OSRF_LOG_MARK, "Function osrfAppSessionMakeRequest() is deprecated; "
+	osrfLogWarning( OSRF_LOG_MARK, "Function osrfAppSessionMakeRequest() is deprecasted; "
 			"call osrfAppSessionSendRequest() instead" );
 	return osrfAppSessionMakeLocaleRequest( session, params,
 			method_name, protocol, param_strings, NULL );
@@ -572,7 +645,7 @@
 
 	osrfLogDebug( OSRF_LOG_MARK,  "Pushing [%d] onto request queue for session [%s] [%s]",
 			req->request_id, session->remote_service, session->session_id );
-	osrfListSet( session->request_queue, req, req->request_id );
+	add_app_request( session, req );
 	return req->request_id;
 }
 
@@ -580,14 +653,14 @@
 	if(session == NULL)
 		return;
 
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
+	osrfAppRequest* req = find_app_request( session, request_id );
 	if(req) req->complete = 1;
 }
 
 int osrf_app_session_request_complete( const osrfAppSession* session, int request_id ) {
 	if(session == NULL)
 		return 0;
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
+	osrfAppRequest* req = find_app_request( session, request_id );
 	if(req)
 		return req->complete;
 	return 0;
@@ -629,7 +702,7 @@
 		osrfAppSession* session, osrfMessage* msg ){
 	if(session == NULL || msg == NULL) return 0;
 
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
+	osrfAppRequest* req = find_app_request( session, msg->thread_trace );
 	if(req == NULL) return 0;
 	_osrf_app_request_push_queue( req, msg );
 
@@ -709,7 +782,7 @@
 }
 
 int osrf_app_session_request_resend( osrfAppSession* session, int req_id ) {
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
+	osrfAppRequest* req = find_app_request( session, req_id );
 	return _osrf_app_request_resend( req );
 }
 
@@ -824,7 +897,12 @@
 	free(session->orig_remote_id);
 	free(session->session_id);
 	free(session->remote_service);
-	osrfListFree(session->request_queue);
+	
+	// Free the request hash
+	int i;
+	for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i ) {
+		_osrf_app_request_free( session->request_hash[ i ] );
+	}
 	free(session);
 }
 
@@ -832,12 +910,10 @@
 		osrfAppSession* session, int req_id, int timeout ) {
 	if(req_id < 0 || session == NULL)
 		return NULL;
-	osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
+	osrfAppRequest* req = find_app_request( session, req_id );
 	return _osrf_app_request_recv( req, timeout );
 }
 
-
-
 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, const jsonObject* data ) {
 	if(!ses || ! data ) return -1;
 



More information about the opensrf-commits mailing list