[Opensrf-commits] r1570 - in trunk: include/opensrf src/libopensrf
svn at svn.open-ils.org
svn at svn.open-ils.org
Mon Jan 5 12:36:45 EST 2009
Author: scottmk
Date: 2009-01-05 12:36:42 -0500 (Mon, 05 Jan 2009)
New Revision: 1570
Modified:
trunk/include/opensrf/transport_client.h
trunk/include/opensrf/transport_message.h
trunk/src/libopensrf/transport_client.c
trunk/src/libopensrf/transport_message.c
Log:
This update restructures the mechanism for queueing incoming transport
messages. In addition, the update to transport_client.c rearranges the
logic a bit in client_recv().
1. A transport_message now carries a pointer to be used in a linked list.
It is initialized to NULL when the message is created. We no longer use
a separately allocated list node to carry the message.
2. The queue of transport_messages no longer starts with a dummy node.
3. Instead of finding the tail of the queue by traversing the list from
the head, we maintain a separate pointer to the tail node. Thus the
enqueuing operation occurs in constant time instead of linear time.
4. In client_recv: we now have the dequeueing code in a single place,
instead of duplicating it.
5. In client_recv: I eliminated some conditional compilation that made
no real difference, since both branches of the #ifdef were effectively
identical.
6. In client_recv: changed both loops from while loops to do-while
loops, since in each case we want to perform at least one iteration.
Modified: trunk/include/opensrf/transport_client.h
===================================================================
--- trunk/include/opensrf/transport_client.h 2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/include/opensrf/transport_client.h 2009-01-05 17:36:42 UTC (rev 1570)
@@ -12,7 +12,8 @@
// Our client struct. We manage a list of messages and a controlling session
// ---------------------------------------------------------------------------
struct transport_client_struct {
- struct message_list_struct* m_list;
+ transport_message* msg_q_head;
+ transport_message* msg_q_tail;
transport_session* session;
int error;
};
Modified: trunk/include/opensrf/transport_message.h
===================================================================
--- trunk/include/opensrf/transport_message.h 2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/include/opensrf/transport_message.h 2009-01-05 17:36:42 UTC (rev 1570)
@@ -34,6 +34,7 @@
int error_code;
int broadcast;
char* msg_xml; /* the entire message as XML complete with entity encoding */
+ struct transport_message_struct* next;
};
typedef struct transport_message_struct transport_message;
@@ -55,13 +56,6 @@
void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
// ---------------------------------------------------------------------------------
-// Formats the Jabber message as XML for encoding.
-// Returns NULL on error
-// ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg );
-
-
-// ---------------------------------------------------------------------------------
// Call this to create the encoded XML for sending on the wire.
// This is a seperate function so that encoding will not necessarily have
// to happen on all messages (i.e. typically only occurs outbound messages).
@@ -75,11 +69,6 @@
int message_free( transport_message* msg );
// ---------------------------------------------------------------------------------
-// Prepares the shared XML document
-// ---------------------------------------------------------------------------------
-//int message_init_xml();
-
-// ---------------------------------------------------------------------------------
// Determines the username of a Jabber ID. This expects a pre-allocated char
// array for the return value.
// ---------------------------------------------------------------------------------
Modified: trunk/src/libopensrf/transport_client.c
===================================================================
--- trunk/src/libopensrf/transport_client.c 2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/src/libopensrf/transport_client.c 2009-01-05 17:36:42 UTC (rev 1570)
@@ -1,21 +1,5 @@
#include <opensrf/transport_client.h>
-#define MESSAGE_LIST_HEAD 1
-#define MESSAGE_LIST_ITEM 2
-
-// ---------------------------------------------------------------------------
-// Represents a node in a linked list. The node holds a pointer to the next
-// node (which is null unless set), a pointer to a transport_message, and
-// and a type variable (which is not really curently necessary).
-// ---------------------------------------------------------------------------
-struct message_list_struct {
- struct message_list_struct* next;
- transport_message* message;
- int type;
-};
-typedef struct message_list_struct transport_message_list;
-typedef struct message_list_struct transport_message_node;
-
static void client_message_handler( void* client, transport_message* msg );
//int main( int argc, char** argv );
@@ -69,15 +53,11 @@
/* build and clear the client object */
transport_client* client = safe_malloc( sizeof( transport_client) );
- /* build and clear the message list */
- client->m_list = safe_malloc( sizeof( transport_message_list ) );
-
- client->m_list->next = NULL;
- client->m_list->message = NULL;
- client->m_list->type = MESSAGE_LIST_HEAD;
-
- /* build the session */
+ /* start with an empty message queue */
+ client->msg_q_head = NULL;
+ client->msg_q_tail = NULL;
+ /* build the session */
client->session = init_transport( server, port, unix_path, client, component );
client->session->message_callback = client_message_handler;
@@ -116,85 +96,63 @@
transport_message* client_recv( transport_client* client, int timeout ) {
if( client == NULL ) { return NULL; }
- transport_message_node* node;
- transport_message* msg;
+ int error = 0; /* boolean */
+ if( NULL == client->msg_q_head ) {
- /* see if there are any message in the messages queue */
- if( client->m_list->next != NULL ) {
- /* pop off the first one... */
- node = client->m_list->next;
- client->m_list->next = node->next;
- msg = node->message;
- free( node );
- return msg;
- }
+ /* no messaage available? try to get one */
+ if( timeout == -1 ) { /* wait potentially forever for data to arrive */
- if( timeout == -1 ) { /* wait potentially forever for data to arrive */
-
- while( client->m_list->next == NULL ) {
- // if( ! session_wait( client->session, -1 ) ) {
int x;
- if( (x = session_wait( client->session, -1 )) ) {
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
- client->error = 1;
- return NULL;
- }
- }
+ do {
+ if( (x = session_wait( client->session, -1 )) ) {
+ osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
+ error = 1;
+ break;
+ }
+ } while( client->msg_q_head == NULL );
- } else { /* wait at most timeout seconds */
+ } else { /* loop up to 'timeout' seconds waiting for data to arrive */
-
- /* if not, loop up to 'timeout' seconds waiting for data to arrive */
- time_t start = time(NULL);
- time_t remaining = (time_t) timeout;
+ /* This loop assumes that a time_t is denominated in seconds -- not */
+ /* guaranteed by Standard C, but a fair bet for Linux or UNIX */
- int counter = 0;
+ time_t start = time(NULL);
+ time_t remaining = (time_t) timeout;
- int wait_ret;
- while( client->m_list->next == NULL && remaining >= 0 ) {
+ int wait_ret;
+ do {
+ if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
+ error = 1;
+ osrfLogDebug(OSRF_LOG_MARK,
+ "session_wait returned failure code %d: setting error=1\n", wait_ret);
+ break;
+ }
- if( (wait_ret= session_wait( client->session, remaining)) ) {
- client->error = 1;
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret);
- return NULL;
- }
-
- ++counter;
-
-#ifdef _ROUTER
- // session_wait returns -1 if there is no more data and we're a router
- if( remaining == 0 ) { // && wait_ret == -1 ) {
- break;
- }
-#else
- if( remaining == 0 ) // or infinite loop
- break;
-#endif
-
- remaining -= (int) (time(NULL) - start);
+ remaining -= time(NULL) - start;
+ } while( NULL == client->msg_q_head && remaining > 0 );
}
-
}
+
+ transport_message* msg = NULL;
- /* again, see if there are any messages in the message queue */
- if( client->m_list->next != NULL ) {
- /* pop off the first one... */
- node = client->m_list->next;
- client->m_list->next = node->next;
- msg = node->message;
- free( node );
- return msg;
-
- } else {
- return NULL;
+ if( error )
+ client->error = 1;
+ else if( client->msg_q_head != NULL ) {
+ /* got message(s); dequeue the oldest one */
+ msg = client->msg_q_head;
+ client->msg_q_head = msg->next;
+ msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */
+ if( NULL == client->msg_q_head )
+ client->msg_q_tail = NULL;
}
+
+ return msg;
}
// ---------------------------------------------------------------------------
// This is the message handler required by transport_session. This handler
-// takes all incoming messages and puts them into the back of a linked list
-// of messages.
+// takes an incoming message and adds it to the tail of a message queue.
// ---------------------------------------------------------------------------
static void client_message_handler( void* client, transport_message* msg ){
@@ -203,21 +161,14 @@
transport_client* cli = (transport_client*) client;
- transport_message_node* node = safe_malloc( sizeof( transport_message_node) );
- node->next = NULL;
- node->type = MESSAGE_LIST_ITEM;
- node->message = msg;
-
-
- /* find the last node and put this onto the end */
- transport_message_node* tail = cli->m_list;
- transport_message_node* current = tail->next;
-
- while( current != NULL ) {
- tail = current;
- current = current->next;
+ /* add the new message to the tail of the queue */
+ if( NULL == cli->msg_q_head )
+ cli->msg_q_tail = cli->msg_q_head = msg;
+ else {
+ cli->msg_q_tail->next = msg;
+ cli->msg_q_tail = msg;
}
- tail->next = node;
+ msg->next = NULL;
}
@@ -225,18 +176,16 @@
if(client == NULL) return 0;
session_free( client->session );
- transport_message_node* current = client->m_list->next;
- transport_message_node* next;
+ transport_message* current = client->msg_q_head;
+ transport_message* next;
/* deallocate the list of messages */
while( current != NULL ) {
next = current->next;
- message_free( current->message );
- free(current);
+ message_free( current );
current = next;
}
- free( client->m_list );
free( client );
return 1;
}
Modified: trunk/src/libopensrf/transport_message.c
===================================================================
--- trunk/src/libopensrf/transport_message.c 2009-01-05 17:05:45 UTC (rev 1569)
+++ trunk/src/libopensrf/transport_message.c 2009-01-05 17:36:42 UTC (rev 1570)
@@ -1,6 +1,5 @@
#include <opensrf/transport_message.h>
-
// ---------------------------------------------------------------------------------
// Allocates and initializes a new transport_message
// ---------------------------------------------------------------------------------
@@ -45,6 +44,7 @@
msg->error_code = 0;
msg->broadcast = 0;
msg->msg_xml = NULL;
+ msg->next = NULL;
return msg;
}
@@ -72,6 +72,7 @@
new_msg->error_code = 0;
new_msg->broadcast = 0;
new_msg->msg_xml = NULL;
+ new_msg->next = NULL;
xmlKeepBlanksDefault(0);
xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 );
@@ -160,65 +161,45 @@
new_msg->body = strdup("");
new_msg->msg_xml = xmlDocToString(msg_doc, 0);
- xmlFreeDoc(msg_doc);
- xmlCleanupParser();
+ xmlFreeDoc(msg_doc);
+ xmlCleanupParser();
return new_msg;
}
void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ) {
- if(!msg) return;
- if( osrf_xid )
- msg->osrf_xid = strdup(osrf_xid);
- else msg->osrf_xid = strdup("");
+ if( msg ) {
+ if( msg->osrf_xid ) free( msg->osrf_xid );
+ msg->osrf_xid = strdup( osrf_xid ? osrf_xid : "" );
+ }
}
void message_set_router_info( transport_message* msg, const char* router_from,
const char* router_to, const char* router_class, const char* router_command,
int broadcast_enabled ) {
- if( !msg ) return;
-
- if(router_from)
- msg->router_from = strdup(router_from);
- else
- msg->router_from = strdup("");
+ if( msg ) {
- if(router_to)
- msg->router_to = strdup(router_to);
- else
- msg->router_to = strdup("");
+ /* free old values, if any */
+ if( msg->router_from ) free( msg->router_from );
+ if( msg->router_to ) free( msg->router_to );
+ if( msg->router_class ) free( msg->router_class );
+ if( msg->router_command ) free( msg->router_command );
- if(router_class)
- msg->router_class = strdup(router_class);
- else
- msg->router_class = strdup("");
-
- if(router_command)
- msg->router_command = strdup(router_command);
- else
- msg->router_command = strdup("");
+ /* install new values */
+ msg->router_from = strdup( router_from ? router_from : "" );
+ msg->router_to = strdup( router_to ? router_to : "" );
+ msg->router_class = strdup( router_class ? router_class : "" );
+ msg->router_command = strdup( router_command ? router_command : "" );
+ msg->broadcast = broadcast_enabled;
- msg->broadcast = broadcast_enabled;
-
- if( msg->router_from == NULL || msg->router_to == NULL ||
- msg->router_class == NULL || msg->router_command == NULL )
- osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" );
-
- return;
+ if( msg->router_from == NULL || msg->router_to == NULL ||
+ msg->router_class == NULL || msg->router_command == NULL )
+ osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" );
+ }
}
-
-/* encodes the message for traversal */
-int message_prepare_xml( transport_message* msg ) {
- if( !msg ) return 0;
- if( msg->msg_xml == NULL )
- msg->msg_xml = message_to_xml( msg );
- return 1;
-}
-
-
// ---------------------------------------------------------------------------------
//
// ---------------------------------------------------------------------------------
@@ -240,16 +221,16 @@
free(msg);
return 1;
}
-
+
+
// ---------------------------------------------------------------------------------
-// Allocates a char* holding the XML representation of this jabber message
+// Encodes the message as XML for traversal; stores in msg_xml member
// ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg ) {
+int message_prepare_xml( transport_message* msg ) {
- //int bufsize;
- //xmlChar* xmlbuf;
- //char* encoded_body;
-
+ if( !msg ) return 0;
+ if( msg->msg_xml ) return 1; /* already done */
+
xmlNodePtr message_node;
xmlNodePtr body_node;
xmlNodePtr thread_node;
@@ -260,11 +241,6 @@
xmlKeepBlanksDefault(0);
- if( ! msg ) {
- osrfLogWarning(OSRF_LOG_MARK, "Passing NULL message to message_to_xml()");
- return NULL;
- }
-
doc = xmlReadDoc( BAD_CAST "<message/>", NULL, NULL, XML_PARSE_NSCLEAN );
message_node = xmlDocGetRootElement(doc);
@@ -318,11 +294,13 @@
xmlBufferPtr xmlbuf = xmlBufferCreate();
xmlNodeDump( xmlbuf, doc, xmlDocGetRootElement(doc), 0, 0);
- char* xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+ msg->msg_xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+
xmlBufferFree(xmlbuf);
xmlFreeDoc( doc );
xmlCleanupParser();
- return xml;
+
+ return 1;
}
More information about the opensrf-commits
mailing list