[Opensrf-commits] r1570 - in trunk: include/opensrf src/libopensrf

svn at svn.open-ils.org svn at svn.open-ils.org
Mon Jan 5 12:36:45 EST 2009


Author: scottmk
Date: 2009-01-05 12:36:42 -0500 (Mon, 05 Jan 2009)
New Revision: 1570

Modified:
   trunk/include/opensrf/transport_client.h
   trunk/include/opensrf/transport_message.h
   trunk/src/libopensrf/transport_client.c
   trunk/src/libopensrf/transport_message.c
Log:
This update restructures the mechanism for queueing incoming transport
messages.  In addition, the update to transport_client.c rearranges the
logic a bit in client_recv().

1. A transport_message now carries a pointer to be used in a linked list.
It is initialized to NULL when the message is created.  We no longer use
a separately allocated list node to carry the message.

2. The queue of transport_messages no longer starts with a dummy node.

3. Instead of finding the tail of the queue by traversing the list from
the head, we maintain a separate pointer to the tail node.  Thus the
enqueuing operation occurs in constant time instead of linear time.

4. In client_recv: we now have the dequeueing code in a single place,
instead of duplicating it.

5. In client_recv: I eliminated some conditional compilation that made
no real difference, since both branches of the #ifdef were effectively
identical.

6. In client_recv: changed both loops from while loops to do-while
loops, since in each case we want to perform at least one iteration.


Modified: trunk/include/opensrf/transport_client.h
===================================================================
--- trunk/include/opensrf/transport_client.h	2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/include/opensrf/transport_client.h	2009-01-05 17:36:42 UTC (rev 1570)
@@ -12,7 +12,8 @@
 // Our client struct.  We manage a list of messages and a controlling session
 // ---------------------------------------------------------------------------
 struct transport_client_struct {
-	struct message_list_struct* m_list;
+	transport_message* msg_q_head;
+	transport_message* msg_q_tail;
 	transport_session* session;
 	int error;
 };

Modified: trunk/include/opensrf/transport_message.h
===================================================================
--- trunk/include/opensrf/transport_message.h	2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/include/opensrf/transport_message.h	2009-01-05 17:36:42 UTC (rev 1570)
@@ -34,6 +34,7 @@
 	int error_code;
 	int broadcast;
 	char* msg_xml; /* the entire message as XML complete with entity encoding */
+	struct transport_message_struct* next;
 };
 typedef struct transport_message_struct transport_message;
 
@@ -55,13 +56,6 @@
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
 
 // ---------------------------------------------------------------------------------
-// Formats the Jabber message as XML for encoding. 
-// Returns NULL on error
-// ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg );
-
-
-// ---------------------------------------------------------------------------------
 // Call this to create the encoded XML for sending on the wire.
 // This is a seperate function so that encoding will not necessarily have
 // to happen on all messages (i.e. typically only occurs outbound messages).
@@ -75,11 +69,6 @@
 int message_free( transport_message* msg );
 
 // ---------------------------------------------------------------------------------
-// Prepares the shared XML document
-// ---------------------------------------------------------------------------------
-//int message_init_xml();
-
-// ---------------------------------------------------------------------------------
 // Determines the username of a Jabber ID.  This expects a pre-allocated char 
 // array for the return value.
 // ---------------------------------------------------------------------------------

Modified: trunk/src/libopensrf/transport_client.c
===================================================================
--- trunk/src/libopensrf/transport_client.c	2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/src/libopensrf/transport_client.c	2009-01-05 17:36:42 UTC (rev 1570)
@@ -1,21 +1,5 @@
 #include <opensrf/transport_client.h>
 
-#define MESSAGE_LIST_HEAD 1
-#define MESSAGE_LIST_ITEM 2
-
-// ---------------------------------------------------------------------------
-// Represents a node in a linked list.  The node holds a pointer to the next
-// node (which is null unless set), a pointer to a transport_message, and
-// and a type variable (which is not really curently necessary).
-// ---------------------------------------------------------------------------
-struct message_list_struct {
-	struct message_list_struct* next;
-	transport_message* message;
-	int type;
-};
-typedef struct message_list_struct transport_message_list;
-typedef struct message_list_struct transport_message_node;
-
 static void client_message_handler( void* client, transport_message* msg );
 
 //int main( int argc, char** argv );
@@ -69,15 +53,11 @@
 	/* build and clear the client object */
 	transport_client* client = safe_malloc( sizeof( transport_client) );
 
-	/* build and clear the message list */
-	client->m_list = safe_malloc( sizeof( transport_message_list ) );
-
-	client->m_list->next = NULL;
-	client->m_list->message = NULL;
-	client->m_list->type = MESSAGE_LIST_HEAD;
-
-	/* build the session */
+	/* start with an empty message queue */
+	client->msg_q_head = NULL;
+	client->msg_q_tail = NULL;
 	
+	/* build the session */
 	client->session = init_transport( server, port, unix_path, client, component );
 
 	client->session->message_callback = client_message_handler;
@@ -116,85 +96,63 @@
 transport_message* client_recv( transport_client* client, int timeout ) {
 	if( client == NULL ) { return NULL; }
 
-	transport_message_node* node;
-	transport_message* msg;
+	int error = 0;  /* boolean */
 
+	if( NULL == client->msg_q_head ) {
 
-	/* see if there are any message in the messages queue */
-	if( client->m_list->next != NULL ) {
-		/* pop off the first one... */
-		node = client->m_list->next;
-		client->m_list->next = node->next;
-		msg = node->message;
-		free( node );
-		return msg;
-	}
+		/* no messaage available?  try to get one */
+		if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
 
-	if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
-
-		while( client->m_list->next == NULL ) {
-		//	if( ! session_wait( client->session, -1 ) ) {
 			int x;
-			if( (x = session_wait( client->session, -1 )) ) {
-				osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
-				client->error = 1;
-				return NULL;
-			}
-		}
+			do {
+				if( (x = session_wait( client->session, -1 )) ) {
+					osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
+					error = 1;
+					break;
+				}
+			} while( client->msg_q_head == NULL );
 
-	} else { /* wait at most timeout seconds */
+		} else {    /* loop up to 'timeout' seconds waiting for data to arrive  */
 
-	
-		/* if not, loop up to 'timeout' seconds waiting for data to arrive */
-		time_t start = time(NULL);	
-		time_t remaining = (time_t) timeout;
+			/* This loop assumes that a time_t is denominated in seconds -- not */
+			/* guaranteed by Standard C, but a fair bet for Linux or UNIX       */
 
-		int counter = 0;
+			time_t start = time(NULL);
+			time_t remaining = (time_t) timeout;
 
-		int wait_ret;
-		while( client->m_list->next == NULL && remaining >= 0 ) {
+			int wait_ret;
+			do {
+				if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
+					error = 1;
+					osrfLogDebug(OSRF_LOG_MARK,
+						"session_wait returned failure code %d: setting error=1\n", wait_ret);
+					break;
+				}
 
-			if( (wait_ret= session_wait( client->session, remaining)) ) {
-				client->error = 1;
-				osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret);
-				return NULL;
-			}
-
-			++counter;
-
-#ifdef _ROUTER
-			// session_wait returns -1 if there is no more data and we're a router
-			if( remaining == 0 ) { // && wait_ret == -1 ) {
-				break;
-			}
-#else
-			if( remaining == 0 ) // or infinite loop
-				break;
-#endif
-
-			remaining -= (int) (time(NULL) - start);
+				remaining -= time(NULL) - start;
+			} while( NULL == client->msg_q_head && remaining > 0 );
 		}
-
 	}
+	
+	transport_message* msg = NULL;
 
-	/* again, see if there are any messages in the message queue */
-	if( client->m_list->next != NULL ) {
-		/* pop off the first one... */
-		node = client->m_list->next;
-		client->m_list->next = node->next;
-		msg = node->message;
-		free( node );
-		return msg;
-
-	} else {
-		return NULL;
+	if( error )
+		client->error = 1;
+	else if( client->msg_q_head != NULL ) {
+		/* got message(s); dequeue the oldest one */
+		msg = client->msg_q_head;
+		client->msg_q_head = msg->next;
+		msg->next = NULL;  /* shouldn't be necessary; nullify for good hygiene */
+		if( NULL == client->msg_q_head )
+			client->msg_q_tail = NULL;
 	}
+
+	return msg;
 }
 
 // ---------------------------------------------------------------------------
 // This is the message handler required by transport_session.  This handler
-// takes all incoming messages and puts them into the back of a linked list
-// of messages.
+// takes an incoming message and adds it to the tail of a message queue.
 // ---------------------------------------------------------------------------
 static void client_message_handler( void* client, transport_message* msg ){
 
@@ -203,21 +161,14 @@
 
 	transport_client* cli = (transport_client*) client;
 
-	transport_message_node* node = safe_malloc( sizeof( transport_message_node) );
-	node->next = NULL;
-	node->type = MESSAGE_LIST_ITEM;
-	node->message = msg;
-
-
-	/* find the last node and put this onto the end */
-	transport_message_node* tail = cli->m_list;
-	transport_message_node* current = tail->next;
-
-	while( current != NULL ) {
-		tail = current;
-		current = current->next;
+	/* add the new message to the tail of the queue */
+	if( NULL == cli->msg_q_head )
+		cli->msg_q_tail = cli->msg_q_head = msg;
+	else {
+		cli->msg_q_tail->next = msg;
+		cli->msg_q_tail = msg;
 	}
-	tail->next = node;
+	msg->next = NULL;
 }
 
 
@@ -225,18 +176,16 @@
 	if(client == NULL) return 0; 
 
 	session_free( client->session );
-	transport_message_node* current = client->m_list->next;
-	transport_message_node* next;
+	transport_message* current = client->msg_q_head;
+	transport_message* next;
 
 	/* deallocate the list of messages */
 	while( current != NULL ) {
 		next = current->next;
-		message_free( current->message );
-		free(current);
+		message_free( current );
 		current = next;
 	}
 
-	free( client->m_list );
 	free( client );
 	return 1;
 }

Modified: trunk/src/libopensrf/transport_message.c
===================================================================
--- trunk/src/libopensrf/transport_message.c	2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/src/libopensrf/transport_message.c	2009-01-05 17:36:42 UTC (rev 1570)
@@ -1,6 +1,5 @@
 #include <opensrf/transport_message.h>
 
-
 // ---------------------------------------------------------------------------------
 // Allocates and initializes a new transport_message
 // ---------------------------------------------------------------------------------
@@ -45,6 +44,7 @@
 	msg->error_code     = 0;
 	msg->broadcast      = 0;
 	msg->msg_xml        = NULL;
+	msg->next           = NULL;
 
 	return msg;
 }
@@ -72,6 +72,7 @@
 	new_msg->error_code     = 0;
 	new_msg->broadcast      = 0;
 	new_msg->msg_xml        = NULL;
+	new_msg->next           = NULL;
 
 	xmlKeepBlanksDefault(0);
 	xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 );
@@ -160,65 +161,45 @@
 		new_msg->body = strdup("");
 
 	new_msg->msg_xml = xmlDocToString(msg_doc, 0);
-   xmlFreeDoc(msg_doc);
-   xmlCleanupParser();
+	xmlFreeDoc(msg_doc);
+	xmlCleanupParser();
 
 	return new_msg;
 }
 
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ) {
-   if(!msg) return;
-   if( osrf_xid )
-      msg->osrf_xid = strdup(osrf_xid);
-   else msg->osrf_xid = strdup("");
+	if( msg ) {
+		if( msg->osrf_xid ) free( msg->osrf_xid );
+		msg->osrf_xid = strdup( osrf_xid ? osrf_xid : "" );
+	}
 }
 
 void message_set_router_info( transport_message* msg, const char* router_from,
 		const char* router_to, const char* router_class, const char* router_command,
 		int broadcast_enabled ) {
 
-	if( !msg ) return;
-	
-	if(router_from)
-		msg->router_from		= strdup(router_from);
-	else
-		msg->router_from		= strdup("");
+	if( msg ) {
 
-	if(router_to)
-		msg->router_to			= strdup(router_to);
-	else
-		msg->router_to			= strdup("");
+		/* free old values, if any */
+		if( msg->router_from    ) free( msg->router_from );
+		if( msg->router_to      ) free( msg->router_to );
+		if( msg->router_class   ) free( msg->router_class );
+		if( msg->router_command ) free( msg->router_command );
 
-	if(router_class)
-		msg->router_class		= strdup(router_class);
-	else 
-		msg->router_class		= strdup("");
-	
-	if(router_command)
-		msg->router_command	= strdup(router_command);
-	else
-		msg->router_command	= strdup("");
+		/* install new values */
+		msg->router_from     = strdup( router_from     ? router_from     : "" );
+		msg->router_to       = strdup( router_to       ? router_to       : "" );
+		msg->router_class    = strdup( router_class    ? router_class    : "" );
+		msg->router_command  = strdup( router_command  ? router_command  : "" );
+		msg->broadcast = broadcast_enabled;
 
-	msg->broadcast = broadcast_enabled;
-
-	if( msg->router_from == NULL || msg->router_to == NULL ||
-			msg->router_class == NULL || msg->router_command == NULL ) 
-		osrfLogError(OSRF_LOG_MARK,  "message_set_router_info(): Out of Memory" );
-
-	return;
+		if( msg->router_from == NULL || msg->router_to == NULL ||
+				msg->router_class == NULL || msg->router_command == NULL ) 
+			osrfLogError(OSRF_LOG_MARK,  "message_set_router_info(): Out of Memory" );
+	}
 }
 
 
-
-/* encodes the message for traversal */
-int message_prepare_xml( transport_message* msg ) {
-	if( !msg ) return 0;
-	if( msg->msg_xml == NULL )
-		msg->msg_xml = message_to_xml( msg );
-	return 1;
-}
-
-
 // ---------------------------------------------------------------------------------
 //
 // ---------------------------------------------------------------------------------
@@ -240,16 +221,16 @@
 	free(msg);
 	return 1;
 }
-	
+
+
 // ---------------------------------------------------------------------------------
-// Allocates a char* holding the XML representation of this jabber message
+// Encodes the message as XML for traversal; stores in msg_xml member
 // ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg ) {
+int message_prepare_xml( transport_message* msg ) {
 
-	//int			bufsize;
-	//xmlChar*		xmlbuf;
-	//char*			encoded_body;
-
+	if( !msg ) return 0;
+	if( msg->msg_xml ) return 1;   /* already done */
+	
 	xmlNodePtr	message_node;
 	xmlNodePtr	body_node;
 	xmlNodePtr	thread_node;
@@ -260,11 +241,6 @@
 
 	xmlKeepBlanksDefault(0);
 
-	if( ! msg ) { 
-		osrfLogWarning(OSRF_LOG_MARK,  "Passing NULL message to message_to_xml()"); 
-		return NULL; 
-	}
-
 	doc = xmlReadDoc( BAD_CAST "<message/>", NULL, NULL, XML_PARSE_NSCLEAN );
 	message_node = xmlDocGetRootElement(doc);
 
@@ -318,11 +294,13 @@
 
 	xmlBufferPtr xmlbuf = xmlBufferCreate();
 	xmlNodeDump( xmlbuf, doc, xmlDocGetRootElement(doc), 0, 0);
-	char* xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+	msg->msg_xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+	
 	xmlBufferFree(xmlbuf);
 	xmlFreeDoc( doc );		 
 	xmlCleanupParser();
-	return xml;
+	
+	return 1;
 }
 
 



More information about the opensrf-commits mailing list