[Opensrf-commits] r1846 - trunk/src/router (scottmk)
svn at svn.open-ils.org
svn at svn.open-ils.org
Sun Nov 8 21:47:27 EST 2009
Author: scottmk
Date: 2009-11-08 21:47:26 -0500 (Sun, 08 Nov 2009)
New Revision: 1846
Modified:
trunk/src/router/osrf_router.c
trunk/src/router/osrf_router.h
trunk/src/router/osrf_router_main.c
Log:
Miscellaneous minor tweaks:
1. Moved nested #includes out of the header file and into the
implementation files as needed.
2. Additions and refinements to comments; adjustments to white space.
3. Changed several functions to return void instead of int, since we don't
look at the return values anyway.
4. Added the const qualifier to several function parameters.
5. In osrfRouterHandleAppRequest(): initialize arr[], an array of pointers,
by setting each pointer to NULL. We had been using memset() on the lot,
relying on a the non-portable assumption that a NULL pointer is
represented by all-bits-zero.
6. Minor rearrangements of the logic here and there, mostly to free things
as soon as we're done with them instead of waiting until the end of the block,
or to defer the declarations of things until we're about to use them.
7. Replaced a couple of calls to jsonObjectToSimpleString() with calls to
jsonObjectGetString(), in order to eliminate a malloc() and a free().
8. Renamed osrfRouterHandleAppResponse() to osrfRouterSendAppResponse(),
which is more descriptive and less vague.
M src/router/osrf_router.h
M src/router/osrf_router.c
M src/router/osrf_router_main.c
Modified: trunk/src/router/osrf_router.c
===================================================================
--- trunk/src/router/osrf_router.c 2009-11-06 12:34:34 UTC (rev 1845)
+++ trunk/src/router/osrf_router.c 2009-11-09 02:47:26 UTC (rev 1846)
@@ -1,4 +1,14 @@
+#include <sys/select.h>
+#include <signal.h>
+#include "opensrf/utils.h"
+#include "opensrf/log.h"
+#include "opensrf/osrf_list.h"
+#include "opensrf/string_array.h"
+#include "opensrf/osrf_hash.h"
#include "osrf_router.h"
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+#include "opensrf/osrf_message.h"
/**
@file osrf_router.c
@@ -8,11 +18,10 @@
each server class. The Jabber IDs for these sessions are distinguished by the use of
the class names as Jabber resource names.
- For each server class there may be multiple server nodes.
+ For each server class there may be multiple server nodes. Each node corresponds to a
+ listener process for a service.
*/
-/* a router maintains a list of server classes */
-
/**
@brief Collection of server classes, with connection parameters for Jabber.
*/
@@ -25,13 +34,13 @@
osrfRouterClass.
*/
osrfHash* classes;
- osrfHashIterator* class_itr; /**< For traversing the list of classes */
+ osrfHashIterator* class_itr; /**< For traversing the list of classes. */
char* domain; /**< Domain name of Jabber server. */
char* name; /**< Router's username for the Jabber logon. */
char* resource; /**< Router's resource name for the Jabber logon. */
char* password; /**< Router's password for the Jabber logon. */
int port; /**< Jabber's port number. */
- sig_atomic_t stop; /**< To be set by signal handler to interrupt main loop */
+ sig_atomic_t stop; /**< To be set by signal handler to interrupt main loop. */
/** Array of client domains that we allow to send requests through us. */
osrfStringArray* trustedClients;
@@ -71,11 +80,11 @@
static osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname );
static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId );
-static void osrfRouterHandleCommand( osrfRouter* router, transport_message* msg );
+static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg );
static void osrfRouterClassHandleMessage( osrfRouter* router,
- osrfRouterClass* rclass, transport_message* msg );
+ osrfRouterClass* rclass, const transport_message* msg );
static void osrfRouterRemoveClass( osrfRouter* router, const char* classname );
-static int osrfRouterClassRemoveNode( osrfRouter* router, const char* classname,
+static void osrfRouterClassRemoveNode( osrfRouter* router, const char* classname,
const char* remoteId );
static void osrfRouterClassFree( char* classname, void* rclass );
static void osrfRouterNodeFree( char* remoteId, void* node );
@@ -87,16 +96,16 @@
static void osrfRouterClassHandleIncoming( osrfRouter* router,
const char* classname, osrfRouterClass* class );
static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
- const char* classname, osrfRouterClass* rclass, transport_message* msg );
-static void osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg );
-static int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg,
- osrfMessage* omsg );
-static int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg,
- osrfMessage* omsg );
-static int osrfRouterHandleAppResponse( osrfRouter* router,
- transport_message* msg, osrfMessage* omsg, const jsonObject* response );
-static int osrfRouterHandleMethodNFound( osrfRouter* router, transport_message* msg,
- osrfMessage* omsg );
+ const char* classname, osrfRouterClass* rclass, const transport_message* msg );
+static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg );
+static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg );
+static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg );
+static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg, const jsonObject* response );
+static void osrfRouterHandleMethodNFound( osrfRouter* router,
+ const transport_message* msg, const osrfMessage* omsg );
#define ROUTER_REGISTER "register"
#define ROUTER_UNREGISTER "unregister"
@@ -108,10 +117,11 @@
#define ROUTER_REQUEST_STATS_CLASS_SUMMARY "opensrf.router.info.stats.class.summary"
/**
- @brief Stop the otherwise infinite main loop of the router.
+ @brief Stop the otherwise endless main loop of the router.
@param router Pointer to the osrfRouter to be stopped.
- To be called by a signal handler.
+ To be called by a signal handler. We don't stop the loop immediately; we just set
+ a switch that the main loop checks on each iteration.
*/
void router_stop( osrfRouter* router )
{
@@ -121,14 +131,14 @@
/**
@brief Allocate and initialize a new osrfRouter.
- @param domain Domain name of Jabber server
+ @param domain Domain name of Jabber server.
@param name Router's username for the Jabber logon.
@param resource Router's resource name for the Jabber logon.
@param password Router's password for the Jabber logon.
@param port Jabber's port number.
@param trustedClients Array of client domains that we allow to send requests through us.
@param trustedServers Array of server domains that we allow to register, etc. with us.
- @return Pointer to the newly allocated osrfRouter.
+ @return Pointer to the newly allocated osrfRouter, or NULL upon error.
Don't connect to Jabber yet. We'll do that later, upon a call to osrfRouterConnect().
@@ -166,7 +176,7 @@
/**
@brief Connect to Jabber.
- @param router Pointer to the osrfRouter to connect to Jabber
+ @param router Pointer to the osrfRouter to connect to Jabber.
@return 0 if successful, or -1 on error.
Allow up to 10 seconds for the logon to succeed.
@@ -184,10 +194,9 @@
/**
@brief Enter endless loop to receive and respond to input.
@param router Pointer to the osrfRouter that's looping.
- @return
On each iteration: wait for incoming messages to arrive on any of our sockets -- i.e.
- either the top level socket belong to the router or any of the lower level sockets
+ either the top level socket belonging to the router or any of the lower level sockets
belonging to the classes. React to the incoming activity as needed.
We don't exit the loop until we receive a signal to stop, or until we encounter an error.
@@ -331,6 +340,8 @@
if( msg->is_error ) {
+ // A previous message bounced. Try to send a clone of it to a
+ // different node of the same class.
transport_message* bouncedMessage = osrfRouterClassHandleBounce(
router, classname, class, msg );
/* handle bounced message */
@@ -366,7 +377,7 @@
- "unregister" -- Remove a node from a class, and the class as well if no nodes are
left for it.
*/
-static void osrfRouterHandleCommand( osrfRouter* router, transport_message* msg ) {
+static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg ) {
if(!(router && msg && msg->router_class)) return;
if( !strcmp( msg->router_command, ROUTER_REGISTER ) ) {
@@ -393,7 +404,7 @@
/**
- @brief Adds an osrfRouterClass to a router, and open a connection for it.
+ @brief Add an osrfRouterClass to a router, and open a connection for it.
@param router Pointer to the osrfRouter.
@param classname The name of the class this node handles.
@return A pointer to the new osrfRouterClass, or NULL upon error.
@@ -430,7 +441,7 @@
/**
@brief Add a new server node to an osrfRouterClass.
- @param rclass Pointer to the osrfRouterClass to add the node to.
+ @param rclass Pointer to the osrfRouterClass to which we are to add the node.
@param remoteId The remote login of the osrfRouterNode.
*/
static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId ) {
@@ -446,15 +457,6 @@
osrfHashSet( rclass->nodes, node, remoteId );
}
-/* copy off the lastMessage, remove the offending node, send error if it's the last node
- ? return NULL if it's the last node ?
-*/
-
-/* handles case where router node is not longer reachable. copies over the
- data from the last sent message and returns a newly crafted suitable for treating
- as a newly inconing message. Removes the dead node and If there are no more
- nodes to send the new message to, returns NULL.
-*/
/**
@brief Handle an input message representing a Jabber error stanza.
@param router Pointer to the current osrfRouter.
@@ -466,11 +468,12 @@
The presumption is that the relevant node is dead. If another node is available for
the same class, then remove the dead one, create a clone of the message to be sent
elsewhere, and return a pointer to it. If there is no other node for the same class,
- send a cancel message back to Jabber, and return NULL. If we can't even do that because
- the entire class is dead, log a message to that effect and return NULL.
+ send a cancel message back to the sender, remove both the node and the class it belongs
+ to, and return NULL. If we can't even do that because the entire class is dead, log
+ a message to that effect and return NULL.
*/
static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
- const char* classname, osrfRouterClass* rclass, transport_message* msg ) {
+ const char* classname, osrfRouterClass* rclass, const transport_message* msg ) {
osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
@@ -525,29 +528,35 @@
}
-/*
- Handles class level requests
- If we get a regular message, we send it to the next node in the list of nodes
- if we get an error, it's a bounce back from a previous attempt. We take the
- body and thread from the last sent on the node that had the bounced message
- and propogate them on to the new message being sent
- @return 0 on success
+/**
+ @brief Forward a class-level message to a listener for the corresponding service.
+ @param router Pointer to the current osrfRouter.
+ @param rclass Pointer to the class to which the message is directed.
+ @param msg Pointer to the message to be forwarded.
+
+ Pick a node for the specified class, and forward the message to it.
+
+ We use an iterator, stored with the class, to maintain a position in the class's list
+ of nodes. Advance the iterator to pick the next node, and if we reach the end, go
+ back to the beginning of the list.
*/
static void osrfRouterClassHandleMessage(
- osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
+ osrfRouter* router, osrfRouterClass* rclass, const transport_message* msg ) {
if(!(router && rclass && msg)) return;
osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
+ // Pick a node, in a round-robin.
osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
- if(!node) {
+ if(!node) { // wrap around to the beginning of the list
osrfHashIteratorReset(rclass->itr);
node = osrfHashIteratorNext( rclass->itr );
}
- if(node) {
+ if(node) { // should always be true -- no class without a node
- transport_message* new_msg= message_init( msg->body,
+ // Build a transport message
+ transport_message* new_msg = message_init( msg->body,
msg->subject, msg->thread, node->remoteId, msg->sender );
message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
message_set_osrf_xid( new_msg, msg->osrf_xid );
@@ -555,9 +564,11 @@
osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
new_msg->router_from, new_msg->recipient );
+ // Save it for possible future reference
message_free( node->lastMessage );
node->lastMessage = new_msg;
+ // Send it
if ( client_send_message( rclass->connection, new_msg ) == 0 )
node->count++;
@@ -566,7 +577,7 @@
osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
new_msg->sender, new_msg->recipient, new_msg->msg_xml );
}
-
+ // We don't free new_msg here because we saved it as node->lastMessage.
}
}
@@ -576,8 +587,8 @@
@param router Pointer to the osrfRouter.
@param classname The name of the class to be removed.
- A callback function, installed in the osrfHash, frees the osrfRouterClass and any
- associated nodes.
+ Delete an osrfRouterClass from the router's list of classes. Indirectly (via a callback
+ function installed in the osrfHash), free the osrfRouterClass and any associated nodes.
*/
static void osrfRouterRemoveClass( osrfRouter* router, const char* classname ) {
if( router && router->classes && classname ) {
@@ -587,46 +598,41 @@
}
-/*
- Removes the given node from the class. Also, if this is that last node in the set,
- removes the class from the router
- @return 0 on successful removal with no class removal
- @return 1 on successful remove with class removal
- @return -1 error on removal
+/**
+ @brief Remove a node from a class. If the class thereby becomes empty, remove it as well.
+ @param router Pointer to the current osrfRouter.
+ @param classname Class name.
+ @param remoteId Identifier for the node to be removed.
*/
-static int osrfRouterClassRemoveNode(
+static void osrfRouterClassRemoveNode(
osrfRouter* router, const char* classname, const char* remoteId ) {
- if(!(router && router->classes && classname && remoteId)) return 0;
+ if(!(router && router->classes && classname && remoteId)) // sanity check
+ return;
osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
osrfRouterClass* class = osrfRouterFindClass( router, classname );
-
if( class ) {
-
osrfHashRemove( class->nodes, remoteId );
if( osrfHashGetCount(class->nodes) == 0 ) {
osrfRouterRemoveClass( router, classname );
- return 1;
}
-
- return 0;
}
-
- return -1;
}
/**
@brief Free a router class object.
- @param classname Class name.
+ @param classname Class name (not used).
@param c Pointer to the osrfRouterClass, cast to a void pointer.
- This function is designated to the osrfHash as a callback.
+ This function is invoked as a callback when we remove an osrfRouterClass from the
+ router's list of classes.
*/
static void osrfRouterClassFree( char* classname, void* c ) {
- if(!(classname && c)) return;
+ if( !c )
+ return;
osrfRouterClass* rclass = (osrfRouterClass*) c;
client_disconnect( rclass->connection );
client_free( rclass->connection );
@@ -645,7 +651,7 @@
/**
@brief Free an osrfRouterNode.
- @param remoteId Name of router (not used).
+ @param remoteId Jabber ID of node (not used).
@param n Pointer to the osrfRouterNode to be freed, cast to a void pointer.
This is a callback installed in an osrfHash (the nodes member of an osrfRouterClass).
@@ -659,6 +665,13 @@
}
+/**
+ @brief Free an osrfRouter and everything it owns.
+ @param router Pointer to the osrfRouter to be freed.
+
+ The osrfRouterClasses and osrfRouterNodes are freed by callback functions installed in
+ the osrfHashes.
+*/
void osrfRouterFree( osrfRouter* router ) {
if(!router) return;
@@ -677,9 +690,11 @@
}
-
-/*
- Finds the class associated with the given class name in the list of classes
+/**
+ @brief Given a class name, find the corresponding osrfRouterClass.
+ @param router Pointer to the osrfRouter that owns the osrfRouterClass.
+ @param classname Name of the class.
+ @return Pointer to a matching osrfRouterClass if found, or NULL if not.
*/
static osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname ) {
if(!( router && router->classes && classname )) return NULL;
@@ -687,8 +702,11 @@
}
-/*
- Finds the router node within this class with the given remote id
+/**
+ @brief Find a given node for a given class.
+ @param rclass Pointer to the osrfRouterClass in which to search.
+ @param remoteId Jabber ID of the node for which to search.
+ @return Pointer to the matching osrfRouterNode, if found; otherwise NULL.
*/
static osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass,
const char* remoteId ) {
@@ -697,12 +715,6 @@
}
-/*
- Clears and populates the provided fd_set* with file descriptors
- from the router's top level connection as well as each of the
- router class connections
- @return The largest file descriptor found in the filling process
-*/
/**
@brief Fill an fd_set with all the sockets owned by the osrfRouter.
@param router Pointer to the osrfRouter whose sockets are to be used.
@@ -748,80 +760,129 @@
return maxfd;
}
-/*
- handles messages that don't have a 'router_command' set. They are assumed to
- be app request messages
+/**
+ @brief Handler a router-level message that isn't a command; presumed to be an app request.
+ @param router Pointer to the current osrfRouter.
+ @param msg Pointer to the incoming message.
+
+ The body of the transport_message is a JSON string, specifically an JSON array.
+ Translate the JSON into a series of osrfMessages, one for each element of the JSON
+ array. Process each osrfMessage in turn. Each message is either a CONNECT or a
+ REQUEST.
*/
-static void osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
+static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg ) {
int T = 32;
osrfMessage* arr[T];
- memset(arr, 0, sizeof(arr));
+
+ // Initialize pointer array to all NULLs
+ int i;
+ for( i = 0; i < T; ++i )
+ arr[ i ] = NULL;
+ // Translate the JSON into an array of pointers to osrfMessage
int num_msgs = osrf_message_deserialize( msg->body, arr, T );
osrfMessage* omsg = NULL;
- int i;
- for( i = 0; i != num_msgs; i++ ) {
+ // Process each osrfMessage
+ for( i = 0; i < num_msgs; i++ ) {
- if( !(omsg = arr[i]) ) continue;
+ omsg = arr[i];
+ if( omsg ) {
- switch( omsg->m_type ) {
+ switch( omsg->m_type ) {
- case CONNECT:
- osrfRouterRespondConnect( router, msg, omsg );
- break;
+ case CONNECT:
+ osrfRouterRespondConnect( router, msg, omsg );
+ break;
- case REQUEST:
- osrfRouterProcessAppRequest( router, msg, omsg );
- break;
+ case REQUEST:
+ osrfRouterProcessAppRequest( router, msg, omsg );
+ break;
- default: break;
+ default:
+ break;
+ }
+
+ osrfMessageFree( omsg );
}
-
- osrfMessageFree( omsg );
}
return;
}
-static int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg,
- osrfMessage* omsg ) {
- if(!(router && msg && omsg)) return -1;
+/**
+ @brief Respond to a CONNECT message.
+ @param router Pointer to the current osrfRouter.
+ @param msg Pointer to the transport_message that the osrfMessage came from.
+ @param omsg Pointer to the osrfMessage to be processed.
- osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
+ An application is trying to connect to the router. Reply with a STATUS message
+ signifying success.
+ "CONNECT" is a bit of a misnomer. We don't establish a stateful session; i.e. we
+ don't retain any memory of this message. We just confirm that the router is alive,
+ and that the client has a good address for it.
+*/
+static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg ) {
+ if(!(router && msg && omsg))
+ return;
+
osrfLogDebug( OSRF_LOG_MARK, "router received a CONNECT message from %s", msg->sender );
+ // Build a success message
+ osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
osrf_message_set_status_info(
success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
+ // Translate the success message into JSON,
+ // and then package the JSON in a transport message
char* data = osrf_message_serialize(success);
+ osrfMessageFree( success );
+ transport_message* return_msg = message_init(
+ data, // message payload, translated from osrfMessage
+ "", // no subject
+ msg->thread, // same thread
+ msg->sender, // destination (client's Jabber ID)
+ "" // don't send our address; client already has it
+ );
+ free( data );
- transport_message* return_m = message_init(
- data, "", msg->thread, msg->sender, "" );
+ client_send_message( router->connection, return_msg );
+ message_free( return_msg );
+}
- client_send_message(router->connection, return_m);
- free(data);
- osrfMessageFree(success);
- message_free(return_m);
+/**
+ @brief Respond to a REQUEST message.
+ @param router Pointer to the current osrfRouter.
+ @param msg Pointer to the transport_message that the osrfMessage came from.
+ @param omsg Pointer to the osrfMessage to be processed.
- return 0;
-}
+ Respond to an information request from an application. Most types of request involve
+ one or more counts of messages successfully routed.
+ Request types currently supported:
+ - "opensrf.router.info.class.list" -- list of class names.
+ - "opensrf.router.info.stats.class.summary" -- total count for a specified class.
+ - "opensrf.router.info.stats.class" -- count for every node of a specified class.
+ - "opensrf.router.info.stats.class.all" -- count for every node of every class.
+ - "opensrf.router.info.stats.class.node.all" -- total count for every class.
+*/
+static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg ) {
+ if(!(router && msg && omsg && omsg->method_name))
+ return;
-static int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg,
- osrfMessage* omsg ) {
-
- if(!(router && msg && omsg && omsg->method_name)) return -1;
-
osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
+ // Branch on the request type. Build a jsonObject as an answer to the request.
jsonObject* jresponse = NULL;
if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
+ // Prepare an array of class names.
int i;
jresponse = jsonNewObjectType(JSON_ARRAY);
@@ -830,25 +891,25 @@
jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
osrfStringArrayFree(keys);
-
} else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_SUMMARY )) {
- osrfRouterClass* class;
- osrfRouterNode* node;
+ // Prepare a count of all the messages successfully routed for a given class.
int count = 0;
- char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
-
+ // class name is the first parameter
+ const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
if (!classname)
- return -1;
+ return;
- class = osrfHashGet(router->classes, classname);
- free(classname);
+ osrfRouterClass* class = osrfHashGet(router->classes, classname);
+ // For each node: add the count to the total.
+ osrfRouterNode* node;
osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
while( (node = osrfHashIteratorNext(node_itr)) ) {
count += node->count;
- //jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
+ // jsonObjectSetKey( class_res, node->remoteId,
+ // jsonNewNumberObject( (double) node->count ) );
}
osrfHashIteratorFree(node_itr);
@@ -856,18 +917,19 @@
} else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS )) {
- osrfRouterClass* class;
- osrfRouterNode* node;
+ // Prepare a hash for a given class. Key: the remoteId of a node. Datum: the
+ // number of messages successfully routed for that node.
- char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
-
+ // class name is the first parameter
+ const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
if (!classname)
- return -1;
+ return;
jresponse = jsonNewObjectType(JSON_HASH);
- class = osrfHashGet(router->classes, classname);
- free(classname);
+ osrfRouterClass* class = osrfHashGet(router->classes, classname);
+ // For each node: get the count and store it in the hash.
+ osrfRouterNode* node;
osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
while( (node = osrfHashIteratorNext(node_itr)) ) {
jsonObjectSetKey( jresponse, node->remoteId,
@@ -877,16 +939,20 @@
} else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_FULL )) {
+ // Prepare a hash of hashes, giving the message counts for each node for each class.
+
osrfRouterClass* class;
osrfRouterNode* node;
- jresponse = jsonNewObjectType(JSON_HASH);
+ jresponse = jsonNewObjectType(JSON_HASH); // Key: class name.
+ // Traverse the list of classes.
osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
while( (class = osrfHashIteratorNext(class_itr)) ) {
- jsonObject* class_res = jsonNewObjectType(JSON_HASH);
+ jsonObject* class_res = jsonNewObjectType(JSON_HASH); // Key: remoteId of node.
const char* classname = osrfHashIteratorKey(class_itr);
+ // Traverse the list of nodes for the current class.
osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
while( (node = osrfHashIteratorNext(node_itr)) ) {
jsonObjectSetKey( class_res, node->remoteId,
@@ -901,19 +967,21 @@
} else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_NODE_FULL )) {
+ // Prepare a hash. Key: class name. Datum: total number of successfully routed
+ // messages routed for nodes of that class.
+
osrfRouterClass* class;
osrfRouterNode* node;
- int count;
jresponse = jsonNewObjectType(JSON_HASH);
osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
- while( (class = osrfHashIteratorNext(class_itr)) ) {
+ while( (class = osrfHashIteratorNext(class_itr)) ) { // For each class
- count = 0;
+ int count = 0;
const char* classname = osrfHashIteratorKey(class_itr);
osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
- while( (node = osrfHashIteratorNext(node_itr)) ) {
+ while( (node = osrfHashIteratorNext(node_itr)) ) { // For each node
count += node->count;
}
osrfHashIteratorFree(node_itr);
@@ -923,84 +991,91 @@
osrfHashIteratorFree(class_itr);
- } else {
+ } else { // None of the above
- return osrfRouterHandleMethodNFound( router, msg, omsg );
+ osrfRouterHandleMethodNFound( router, msg, omsg );
+ return;
}
-
- osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
+ // Send the result back to the requester.
+ osrfRouterSendAppResponse( router, msg, omsg, jresponse );
jsonObjectFree(jresponse);
-
- return 0;
-
}
-static int osrfRouterHandleMethodNFound(
- osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+/**
+ @brief Respond to an invalid REQUEST message.
+ @param router Pointer to the current osrfRouter.
+ @param msg Pointer to the transport_message that contained the REQUEST message.
+ @param omsg Pointer to the osrfMessage that contained the REQUEST.
+*/
+static void osrfRouterHandleMethodNFound( osrfRouter* router,
+ const transport_message* msg, const osrfMessage* omsg ) {
- osrfMessage* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
- osrf_message_set_status_info( err,
- "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
+ // Create an exception message
+ osrfMessage* err = osrf_message_init( STATUS, omsg->thread_trace, 1 );
+ osrf_message_set_status_info( err,
+ "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
- char* data = osrf_message_serialize(err);
+ // Translate it into JSON
+ char* data = osrf_message_serialize(err);
+ osrfMessageFree( err );
- transport_message* tresponse = message_init(
- data, "", msg->thread, msg->sender, msg->recipient );
+ // Wrap the JSON up in a transport_message
+ transport_message* tresponse = message_init(
+ data, "", msg->thread, msg->sender, msg->recipient );
+ free(data);
- client_send_message(router->connection, tresponse );
-
- free(data);
- osrfMessageFree( err );
- message_free(tresponse);
- return 0;
+ // Send it
+ client_send_message( router->connection, tresponse );
+ message_free( tresponse );
}
-static int osrfRouterHandleAppResponse( osrfRouter* router,
- transport_message* msg, osrfMessage* omsg, const jsonObject* response ) {
+/**
+ @brief Send a response to a router REQUEST message.
+ @param router Pointer to the current osrfRouter.
+ @param msg Pointer to the transport_message that contained the REQUEST message.
+ @param omsg Pointer to the osrfMessage that contained the REQUEST.
+ @param response Pointer to the jsonObject that constitutes the response.
+*/
+static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
+ const osrfMessage* omsg, const jsonObject* response ) {
if( response ) { /* send the response message */
+ // Turn the jsonObject into JSON, and load it into an osrfMessage
osrfMessage* oresponse = osrf_message_init(
RESULT, omsg->thread_trace, omsg->protocol );
char* json = jsonObjectToJSON(response);
- osrf_message_set_result_content( oresponse, json);
+ osrf_message_set_result_content( oresponse, json );
+ free(json);
+ // Package the osrfMessage into a transport_message, and send it
char* data = osrf_message_serialize(oresponse);
+ osrfMessageFree(oresponse);
osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
transport_message* tresponse = message_init(
data, "", msg->thread, msg->sender, msg->recipient );
+ free(data);
client_send_message(router->connection, tresponse );
-
- osrfMessageFree(oresponse);
message_free(tresponse);
- free(json);
- free(data);
}
/* now send the 'request complete' message */
osrfMessage* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete",
OSRF_STATUS_COMPLETE );
-
char* statusdata = osrf_message_serialize(status);
+ osrfMessageFree(status);
transport_message* sresponse = message_init(
statusdata, "", msg->thread, msg->sender, msg->recipient );
+ free(statusdata);
+
client_send_message(router->connection, sresponse );
-
- free(statusdata);
- osrfMessageFree(status);
message_free(sresponse);
-
- return 0;
}
-
-
-
-
Modified: trunk/src/router/osrf_router.h
===================================================================
--- trunk/src/router/osrf_router.h 2009-11-06 12:34:34 UTC (rev 1845)
+++ trunk/src/router/osrf_router.h 2009-11-09 02:47:26 UTC (rev 1846)
@@ -1,21 +1,32 @@
#ifndef OSRF_ROUTER_H
#define OSRF_ROUTER_H
-#include <sys/select.h>
-#include <signal.h>
-#include <stdio.h>
+/**
+ @file
+ @brief Collection of routines for an OSRF router.
-#include "opensrf/utils.h"
-#include "opensrf/log.h"
-#include "opensrf/osrf_list.h"
-#include "opensrf/osrf_hash.h"
+ The router receives messages from clients and passes each one to a listener for the
+ targeted service. Where there are multiple listeners for the same service, the router
+ picks one on a round-robin basis. If a message bounces because the listener has died,
+ the router sends it to another listener for the same service, if one is available.
-#include "opensrf/string_array.h"
-#include "opensrf/transport_client.h"
-#include "opensrf/transport_message.h"
+ The server's response to the client, if any, bypasses the router. If the server needs to
+ set up a stateful session with a client, it does so directly (well, via Jabber). Only the
+ initial message from the client passes through the router.
-#include "opensrf/osrf_message.h"
+ Thus the router serves two main functions:
+ - It spreads the load across multiple listeners for the same service.
+ - It reroutes bounced messages to alternative listeners.
+ It also responds to requests for information about the number of messages routed to
+ different services and listeners.
+*/
+
+/*
+ Prerequisite:
+
+ string_array.h
+*/
#ifdef __cplusplus
extern "C" {
#endif
@@ -23,35 +34,16 @@
struct osrfRouterStruct;
typedef struct osrfRouterStruct osrfRouter;
-/**
- Allocates a new router.
- @param domain The jabber domain to connect to
- @param name The login name for the router
- @param resource The login resource for the router
- @param password The login password for the new router
- @param port The port to connect to the jabber server on
- @param trustedClients The array of client domains that we allow to send requests through us
- @param trustedServers The array of server domains that we allow to register, etc. with ust.
- @return The allocated router or NULL on memory error
- */
-osrfRouter* osrfNewRouter( const char* domain, const char* name, const char* resource,
+osrfRouter* osrfNewRouter( const char* domain, const char* name, const char* resource,
const char* password, int port, osrfStringArray* trustedClients,
osrfStringArray* trustedServers );
int osrfRouterConnect( osrfRouter* router );
-/**
- Waits for incoming data to route
- If this function returns, then the router's connection to the jabber server
- has failed.
- */
void osrfRouterRun( osrfRouter* router );
void router_stop( osrfRouter* router );
-/**
- Frees a router
- */
void osrfRouterFree( osrfRouter* router );
#ifdef __cplusplus
Modified: trunk/src/router/osrf_router_main.c
===================================================================
--- trunk/src/router/osrf_router_main.c 2009-11-06 12:34:34 UTC (rev 1845)
+++ trunk/src/router/osrf_router_main.c 2009-11-09 02:47:26 UTC (rev 1846)
@@ -2,13 +2,12 @@
@file osrf_router_main.c
@brief top level of OSRF Router
- This top level loads a configuration file and forks into one or more child
- processes. Each child process configures itself, daemonizes itself, and then (via a
- call to osrfRouterRun()) goes into an infinite loop to route messages among clients
- and servers.
+ This top level loads a configuration file and forks into one or more child processes.
+ Each child process configures itself, daemonizes itself, and then (via a call to
+ osrfRouterRun()) goes into an infinite loop to route messages among clients and servers.
The first command-line parameter is the name of the configuration file.
-
+
The second command-line parameter is the context -- an XML tag identifying the subset
of the configuration file that is relevant to this application (since a configuration
file may include information for multiple applications).
@@ -16,16 +15,14 @@
Any subsequent command-line parameters are silently ignored.
*/
-#include "osrf_router.h"
-#include <opensrf/osrfConfig.h>
-#include <opensrf/utils.h>
-#include <opensrf/log.h>
-#include <opensrf/osrf_json.h>
#include <signal.h>
+#include "opensrf/utils.h"
+#include "opensrf/log.h"
+#include "opensrf/osrf_list.h"
+#include "opensrf/string_array.h"
+#include "opensrf/osrfConfig.h"
+#include "osrf_router.h"
-/**
- An osrfRouter contains various bits and scraps that the router uses for networking.
-*/
static osrfRouter* router = NULL;
static sig_atomic_t stop_signal = 0;
@@ -40,7 +37,7 @@
number so that we can report it later and re-raise it.
*/
void routerSignalHandler( int signo ) {
-
+
signal( signo, routerSignalHandler );
router_stop( router );
stop_signal = signo;
@@ -52,7 +49,7 @@
@param argv Pointer to array of items on command line.
@return System return code.
- Load configuration file, spawn zero or more child processes, and exit.
+ Load a configuration file, spawn zero or more child processes, and exit.
*/
int main( int argc, char* argv[] ) {
@@ -66,51 +63,50 @@
const char* context = argv[2];
/* Get a set of router definitions from a config file */
-
+
osrfConfig* cfg = osrfConfigInit(config_file, context);
if( NULL == cfg ) {
osrfLogError( OSRF_LOG_MARK, "Router can't load config file %s", config_file );
exit( EXIT_FAILURE );
}
-
+
osrfConfigSetDefaultConfig(cfg);
- jsonObject* configInfo = osrfConfigGetValueObject(NULL, "/router");
-
+ jsonObject* configInfo = osrfConfigGetValueObject(NULL, "/router");
+
if( configInfo->size < 1 || NULL == jsonObjectGetIndex( configInfo, 1 ) ) {
osrfLogError( OSRF_LOG_MARK, "No routers defined in config file %s, context \"%s\"",
config_file, context );
exit( EXIT_FAILURE );
}
-
- /* We're done with the command line now, */
- /* so we can safely overlay it */
-
+
+ /* We're done with the command line now, so we can safely overlay it */
+
init_proc_title( argc, argv );
set_proc_title( "OpenSRF Router" );
/* Spawn child process(es) */
-
- int i;
- for(i = 0; i < configInfo->size; i++) {
- jsonObject* configChunk = jsonObjectGetIndex(configInfo, i);
+
+ int i;
+ for(i = 0; i < configInfo->size; i++) {
+ jsonObject* configChunk = jsonObjectGetIndex(configInfo, i);
if( ! jsonObjectGetKey( configChunk, "transport" ) )
{
// In searching the configuration file for a given context, we may have found a
// spurious hit on an unrelated part of the configuration file that happened to use
// the same XML tag. In fact this happens routinely in practice.
-
+
// If we don't see a member for "transport" then this is presumably such a spurious
// hit, so we silently ignore it.
-
+
// It is also possible that it's the right part of the configuration file but it has a
// typo or other such error, making it look spurious. In that case, well, too bad.
continue;
}
- if(fork() == 0) { /* create a new child to run this router instance */
- setupRouter(configChunk);
+ if(fork() == 0) { /* create a new child to run this router instance */
+ setupRouter(configChunk);
break; /* We're a child; don't spawn any more children here */
}
- }
+ }
if( stop_signal ) {
// Interrupted by a signal? Re raise so the parent can see it.
@@ -126,8 +122,8 @@
@brief Configure and run a child process.
@param configChunk Pointer to a subset of the loaded configuration.
- Configure oneself, daemonize, and then call osrfRouterRun() to go into an infinite
- loop. Do not return unless something goes wrong.
+ Configure oneself, daemonize, and then call osrfRouterRun() to go into a
+ near-endless loop. Return when interrupted by a signal, or when something goes wrong.
*/
static void setupRouter(jsonObject* configChunk) {
@@ -147,7 +143,7 @@
if(level) llevel = atoi(level);
if(!log_file)
- {
+ {
fprintf(stderr, "Log file name not specified for router\n");
return;
}
@@ -165,43 +161,45 @@
"user: %s resource: %s", server, port, username, resource );
int iport = 0;
- if(port) iport = atoi( port );
+ if(port)
+ iport = atoi( port );
osrfStringArray* tclients = osrfNewStringArray(4);
osrfStringArray* tservers = osrfNewStringArray(4);
- jsonObject* tclientsList = jsonObjectFindPath(configChunk, "/trusted_domains/client");
- jsonObject* tserversList = jsonObjectFindPath(configChunk, "/trusted_domains/server");
+ jsonObject* tclientsList = jsonObjectFindPath(configChunk, "/trusted_domains/client");
+ jsonObject* tserversList = jsonObjectFindPath(configChunk, "/trusted_domains/server");
int i;
- if(tserversList->type == JSON_ARRAY) {
- for( i = 0; i != tserversList->size; i++ ) {
- const char* serverDomain = jsonObjectGetString(jsonObjectGetIndex(tserversList, i));
- osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted server: %s", serverDomain);
- osrfStringArrayAdd(tservers, serverDomain);
- }
- } else {
- const char* serverDomain = jsonObjectGetString(tserversList);
- osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted server: %s", serverDomain);
- osrfStringArrayAdd(tservers, serverDomain);
- }
+ if(tserversList->type == JSON_ARRAY) {
+ for( i = 0; i != tserversList->size; i++ ) {
+ const char* serverDomain = jsonObjectGetString(jsonObjectGetIndex(tserversList, i));
+ osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted server: %s", serverDomain);
+ osrfStringArrayAdd(tservers, serverDomain);
+ }
+ } else {
+ const char* serverDomain = jsonObjectGetString(tserversList);
+ osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted server: %s", serverDomain);
+ osrfStringArrayAdd(tservers, serverDomain);
+ }
- if(tclientsList->type == JSON_ARRAY) {
- for( i = 0; i != tclientsList->size; i++ ) {
- const char* clientDomain = jsonObjectGetString(jsonObjectGetIndex(tclientsList, i));
- osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted client: %s", clientDomain);
- osrfStringArrayAdd(tclients, clientDomain);
- }
- } else {
- const char* clientDomain = jsonObjectGetString(tclientsList);
- osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted client: %s", clientDomain);
- osrfStringArrayAdd(tclients, clientDomain);
- }
+ if(tclientsList->type == JSON_ARRAY) {
+ for( i = 0; i != tclientsList->size; i++ ) {
+ const char* clientDomain = jsonObjectGetString(jsonObjectGetIndex(tclientsList, i));
+ osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted client: %s", clientDomain);
+ osrfStringArrayAdd(tclients, clientDomain);
+ }
+ } else {
+ const char* clientDomain = jsonObjectGetString(tclientsList);
+ osrfLogInfo( OSRF_LOG_MARK, "Router adding trusted client: %s", clientDomain);
+ osrfStringArrayAdd(tclients, clientDomain);
+ }
if( tclients->size == 0 || tservers->size == 0 ) {
- osrfLogError( OSRF_LOG_MARK, "We need trusted servers and trusted client to run the router...");
+ osrfLogError( OSRF_LOG_MARK,
+ "We need trusted servers and trusted client to run the router...");
osrfStringArrayFree( tservers );
osrfStringArrayFree( tclients );
return;
@@ -209,7 +207,7 @@
router = osrfNewRouter( server,
username, resource, password, iport, tclients, tservers );
-
+
signal(SIGHUP,routerSignalHandler);
signal(SIGINT,routerSignalHandler);
signal(SIGTERM,routerSignalHandler);
@@ -220,18 +218,14 @@
return;
}
+ // Done configuring? Let's get to work.
+
daemonize();
osrfRouterRun( router );
- // Shouldn't get here, since osrfRouterRun()
- // should go into an infinite loop
-
osrfRouterFree(router);
router = NULL;
-
osrfLogInfo( OSRF_LOG_MARK, "Router freed" );
return;
}
-
-
More information about the opensrf-commits
mailing list