[Opensrf-commits] r1988 - in trunk: include/opensrf src/libopensrf (scottmk)

svn at svn.open-ils.org svn at svn.open-ils.org
Tue Aug 3 23:20:35 EDT 2010


Author: scottmk
Date: 2010-08-03 23:20:33 -0400 (Tue, 03 Aug 2010)
New Revision: 1988

Modified:
   trunk/include/opensrf/osrf_application.h
   trunk/src/libopensrf/osrf_application.c
Log:
Implement the chunking of OSRF messages.  I.e. bundle multiple
OSRF messages into an XMPP message, up to about 10k bytes, so
as to reduce networking overhead.

M    include/opensrf/osrf_application.h
M    src/libopensrf/osrf_application.c


Modified: trunk/include/opensrf/osrf_application.h
===================================================================
--- trunk/include/opensrf/osrf_application.h	2010-08-03 02:47:00 UTC (rev 1987)
+++ trunk/include/opensrf/osrf_application.h	2010-08-04 03:20:33 UTC (rev 1988)
@@ -4,6 +4,31 @@
 /**
 	@file osrf_application.h
 	@brief Routines to load and manage shared object libraries.
+
+	Every method of a service is implemented by a C function.  In a few cases those
+	functions are generic to all services.  In other cases they are loaded and executed from
+	a shared object library that is specific to the application offering the service,  A
+	registry maps method names to function names so that we can call the right function.
+
+	Each such function has a similar signature:
+
+		int method_name( osrfMethodContext* ctx );
+
+	The return value is negative in case of an error.  A return code of zero implies that
+	the method has already sent the client a STATUS message to say that it is finished.
+	A return code greater than zero implies that the method has not sent such a STATUS
+	message, so we need to do so after the method returns.
+
+	Any arguments passed to the method are bundled together in a jsonObject inside the
+	osrfMethodContext.
+
+	An application's shared object may also implement any or all of three standard functions:
+
+	- int osrfAppInitialize( void ) Called when an application is registered
+	- int osrfAppChildInit( void ) Called when a server drone is spawned
+	- void osrfAppChildExit( void ) Called when a server drone terminates
+
+	osrfAppInitialize() and osrfAppChild return zero if successful, and non-zero if not.
 */
 
 #include <opensrf/utils.h>
@@ -68,18 +93,65 @@
 #define OSRF_METHOD_VERIFY_CONTEXT(d) _OSRF_METHOD_VERIFY_CONTEXT(d);
 #endif
 
+/**
+	@name Method options
+	@brief Macros that get OR'd together to form method options.
+*/
+/*@{*/
+/**
+	@brief  Marks a method as a system method.
+
+	System methods are implemented by generic functions, called via static linkage.  They
+	are not loaded or executed from shared objects.
+*/
 #define OSRF_METHOD_SYSTEM          1
+/**
+	@brief Notes that the method may return more than one result.
+
+	For a @em streaming method, we register both an atomic method and a non-atomic method.
+	See also OSRF_METHOD_ATOMIC.
+*/
 #define OSRF_METHOD_STREAMING       2
+/**
+	@brief  Combines all responses into a single RESULT message.
+
+	For a @em non-atomic method, the server returns each response to the client in a
+	separate RESULT message.  It sends a STATUS message at the end to signify the end of the
+	message stream.
+
+	For an @em atomic method, the server buffers all responses until the method returns,
+	and then sends them all at once in a single RESULT message (followed by a STATUS message).
+	Each individual response is encoded as an entry in a JSON array.  This buffering is
+	transparent to the function that implements the method.
+
+	Atomic methods incur less networking overhead than non-atomic methods, at the risk of
+	creating excessively large RESULT messages.  The HTTP gateway requires the atomic versions
+	of streaming methods because of the stateless nature of the HTTP protocol.
+
+	If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic
+	and a non-atomic method, whose names are identical except that the atomic one carries a
+	suffix of ".atomic".
+*/
 #define OSRF_METHOD_ATOMIC          4
+/**
+	@brief  Notes that a previous result to the same call may be available in memcache.
+
+	Before calling the registered function, a cachable method checks memcache for a previously
+	determined result for the same call.  If no such result is available, it calls the
+	registered function and caches the new result before returning.
+
+	This caching is not currently implemented for C methods.
+*/
 #define OSRF_METHOD_CACHABLE        8
+/*@}*/
 
 typedef struct {
-	char* name;                 /**< the method name. */
-	char* symbol;               /**< the symbol name (function name). */
-	char* notes;                /**< public method documentation. */
-	int argc;                   /**< how many args this method expects. */
+	char* name;                 /**< Method name. */
+	char* symbol;               /**< Symbol name (function name) within the shared object. */
+	char* notes;                /**< Public method documentation. */
+	int argc;                   /**< The minimum number of arguments for the method. */
 	//char* paramNotes;         /**< Description of the params expected for this method. */
-	int options;                /**< bitswitches setting various options for this method. */
+	int options;                /**< Bit switches setting various options for this method. */
 	void* userData;             /**< Opaque pointer to application-specific data. */
 
 	/*
@@ -91,35 +163,15 @@
 } osrfMethod;
 
 typedef struct {
-	osrfAppSession* session;    /**< the current session. */
-	osrfMethod* method;         /**< the requested method. */
-	jsonObject* params;         /**< the params to the method. */
-	int request;                /**< request id. */
-	jsonObject* responses;      /**< array of cached responses. */
+	osrfAppSession* session;    /**< Pointer to the current application session. */
+	osrfMethod* method;         /**< Pointer to the requested method. */
+	jsonObject* params;         /**< Parameters to the method. */
+	int request;                /**< Request id. */
+	jsonObject* responses;      /**< Array of cached responses. */
 } osrfMethodContext;
 
-/**
-	Register an application
-	@param appName The name of the application
-	@param soFile The library (.so) file that implements this application
-	@return 0 on success, -1 on error
-*/
 int osrfAppRegisterApplication( const char* appName, const char* soFile );
 
-/**
-	@brief Register a method for a given application.
-	
-	@param appName Name of the application that implements the method.
-	@param methodName The fully qualified name of the method.
-	@param symbolName The symbol name (function name) that implements the method.
-	@param notes Public documentation for this method.
-	@params argc The number of arguments this method expects.
-	@param options Bit switches setting various options.
-	@return 0 on success, -1 on error
-
-	Any method with  the OSRF_METHOD_STREAMING option set will have a ".atomic"
-	version of the method registered automatically.
-*/
 int osrfAppRegisterMethod( const char* appName, const char* methodName,
 		const char* symbolName, const char* notes, int argc, int options );
 
@@ -128,41 +180,19 @@
 
 osrfMethod* _osrfAppFindMethod( const char* appName, const char* methodName );
 
-/**
-	Runs the specified method for the specified application.
-	@param appName The name of the application who's method to run
-	@param methodName The name of the method to run
-	@param ses The app session attached to this request
-	@params reqId The request id for this request
-	@param params The method parameters
-*/
 int osrfAppRunMethod( const char* appName, const char* methodName,
 		osrfAppSession* ses, int reqId, jsonObject* params );
 
-/**
-	@brief Respond to the client with a method exception.
-	@param ses The current session.
-	@param request The request id.
-	@param msg The debug message to send to the client.
-	@return 0 on successfully sending of the message, -1 otherwise.
-*/
 int osrfAppRequestRespondException( osrfAppSession* ses, int request, const char* msg, ... );
 
 int osrfAppRespond( osrfMethodContext* context, const jsonObject* data );
+
 int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data );
 
-/* OSRF_METHOD_ATOMIC and/or OSRF_METHOD_CACHABLE and/or 0 for no special options */
-//int osrfAppProcessMethodOptions( char* method );
-
-/** Tell the backend process to run its child init function */
 int osrfAppRunChildInit(const char* appname);
 
 void osrfAppRunExitCode( void );
 
-/**
-	Determine whether the context looks healthy.
-	Return 0 if it does, or -1 if it doesn't.
-*/
 int osrfMethodVerifyContext( osrfMethodContext* ctx );
 
 #ifdef __cplusplus

Modified: trunk/src/libopensrf/osrf_application.c
===================================================================
--- trunk/src/libopensrf/osrf_application.c	2010-08-03 02:47:00 UTC (rev 1987)
+++ trunk/src/libopensrf/osrf_application.c	2010-08-04 03:20:33 UTC (rev 1988)
@@ -48,6 +48,8 @@
 #define OSRF_SYSMETHOD_ECHO_ATOMIC              "opensrf.system.echo.atomic"
 /*@}*/
 
+#define OSRF_MSG_BUFFER_SIZE     10240
+
 /**
 	@brief Represent an Application.
 */
@@ -238,13 +240,12 @@
 
 	The @a options parameter is zero or more of the following macros, OR'd together:
 
-	- OSRF_METHOD_SYSTEM
-	- OSRF_METHOD_STREAMING
-	- OSRF_METHOD_ATOMIC
-	- OSRF_METHOD_CACHABLE
+	- OSRF_METHOD_SYSTEM        called by static linkage (shouldn't be used here)
+	- OSRF_METHOD_STREAMING     method may return more than one response
+	- OSRF_METHOD_ATOMIC        return all responses collected in a single RESULT message
+	- OSRF_METHOD_CACHABLE      cache results in memcache
 
-	If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of
-	the method.
+	If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
 */
 int osrfAppRegisterMethod( const char* appName, const char* methodName,
 		const char* symbolName, const char* notes, int argc, int options ) {
@@ -325,7 +326,7 @@
 		methodName = "";  // should never happen
 
 	if( options & OSRF_METHOD_ATOMIC ) {
-		// Append ".atomic" to the name, and make the method streaming
+		// Append ".atomic" to the name, and make the method atomic
 		char mb[ strlen( methodName ) + 8 ];
 		sprintf( mb, "%s.atomic", methodName );
 		method->name        = strdup( mb );
@@ -486,7 +487,11 @@
 		return osrfAppRequestRespondException(
 				ses, reqId, "An unknown server error occurred" );
 
-	return _osrfAppPostProcess( &context, retcode );
+	retcode = _osrfAppPostProcess( &context, retcode );
+
+	if( context.responses )
+		jsonObjectFree( context.responses );
+	return retcode;
 }
 
 /**
@@ -526,25 +531,64 @@
 }
 
 /**
+	@brief Send any response messages that have accumulated in the output buffer.
+	@param ses Pointer to the current application session.
+	@param outbuf Pointer to the output buffer.
+	@return Zero if successful, or -1 if not.
+
+	Used only by servers to respond to clients.
+*/
+static int flush_responses( osrfAppSession* ses, growing_buffer* outbuf ) {
+
+	// Collect any inbound traffic on the socket(s).  This doesn't accomplish anything for the
+	// immediate task at hand, but it may help to keep TCP from getting clogged in some cases.
+	osrf_app_session_queue_wait( ses, 0, NULL );
+
+	int rc = 0;
+	if( buffer_length( outbuf ) > 0 ) {    // If there's anything to send...
+		buffer_add_char( outbuf, ']' );    // Close the JSON array
+		if( osrfSendTransportPayload( ses, OSRF_BUFFER_C_STR( ses->outbuf ))) {
+			osrfLogError( OSRF_LOG_MARK, "Unable to flush response buffer" );
+			rc = -1;
+		}
+	}
+	buffer_reset( ses->outbuf );
+	return rc;
+}
+
+/**
+	@brief Add a message to an output buffer.
+	@param outbuf Pointer to the output buffer.
+	@param msg Pointer to the message to be added, in the form of a JSON string.
+
+	Since the output buffer is in the form of a JSON array, prepend a left bracket to the
+	first message, and a comma to subsequent ones.
+
+	Used only by servers to respond to clients.
+*/
+static inline void append_msg( growing_buffer* outbuf, const char* msg ) {
+	if( outbuf && msg ) {
+		char prefix = buffer_length( outbuf ) > 0 ? ',' : '[';
+		buffer_add_char( outbuf, prefix );
+		buffer_add( outbuf, msg );
+	}
+}
+
+/**
 	@brief Either send or enqueue a response to a client, optionally with a completion notice.
 	@param ctx Pointer to the method context.
 	@param data Pointer to the response, in the form of a jsonObject.
 	@param complete Boolean: if true, we will accompany the RESULT message with a STATUS
 	message indicating that the response is complete.
-	@return Zero if successful, or -1 upon error.  The only recognized errors are if either
-	the @a ctx pointer or its method pointer is NULL.
+	@return Zero if successful, or -1 upon error.
 
 	For an atomic method, add a copy of the response data to a cache within the method
 	context, to be sent later.  In this case the @a complete parameter has no effect,
 	because we'll send the STATUS message later when we send the cached results.
 
-	If the method is cachable but not atomic, do nothing, ignoring the results in @a data.
-	Apparently there are no cachable methods at this writing.  If we ever invent some, we
-	may need to revisit this function.
-
-	If the method is neither atomic nor cachable, then send a RESULT message to the client,
-	with the results in @a data.  If @a complete is true, also send a STATUS message to
-	indicate that the response is complete.
+	If the method is not atomic, translate the message into JSON and append it to a buffer,
+	flushing the buffer as needed to avoid overflow.  If @a complete is true, append
+	a STATUS message (as JSON) to the buffer and flush the buffer.
 */
 static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int complete ) {
 	if(!(ctx && ctx->method)) return -1;
@@ -560,15 +604,55 @@
 		// Add a copy of the data object to the cache.
 		if ( data != NULL )
 			jsonObjectPush( ctx->responses, jsonObjectClone(data) );
-	}
+	} else {
+		osrfLogDebug( OSRF_LOG_MARK,
+			"Adding responses to stash for method %s", ctx->method->name );
 
-	if( !(ctx->method->options & OSRF_METHOD_ATOMIC ) &&
-			!(ctx->method->options & OSRF_METHOD_CACHABLE) ) {
+		if( data ) {
+			// If you want to flush the intput buffers for every output message,
+			// this is the place to do it.
+			//osrf_app_session_queue_wait( ctx->session, 0, NULL );
 
-		if(complete)
-			osrfAppRequestRespondComplete( ctx->session, ctx->request, data );
-		else
-			osrfAppRequestRespond( ctx->session, ctx->request, data );
+			// Create an OSRF message
+			osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+			osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+			osrf_message_set_result( msg, data );
+
+			// Serialize the OSRF message into JSON text
+			char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+			osrfMessageFree( msg );
+
+			// If the new message would overflow the buffer, flush the output buffer first
+			int len_so_far = buffer_length( ctx->session->outbuf );
+			if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
+				if( flush_responses( ctx->session, ctx->session->outbuf ))
+					return -1;
+			}
+
+			// Append the JSON text to the output buffer
+			append_msg( ctx->session->outbuf, json );
+			free( json );
+		}
+
+		if(complete) {
+			// Create a STATUS message
+			osrfMessage* status_msg = osrf_message_init( STATUS, ctx->request, 1 );
+			osrf_message_set_status_info( status_msg, "osrfConnectStatus", "Request Complete",
+				OSRF_STATUS_COMPLETE );
+
+			// Serialize the STATUS message into JSON text
+			char* json = jsonObjectToJSON( osrfMessageToJSON( status_msg ));
+			osrfMessageFree( status_msg );
+
+			// Add the STATUS message to the output buffer.
+			// It's short, so don't worry about avoiding overflow.
+			append_msg( ctx->session->outbuf, json );
+			free( json );
+
+			// Flush the output buffer, sending any accumulated messages.
+			if( flush_responses( ctx->session, ctx->session->outbuf ))
+				return -1;
+		}
 	}
 
 	return 0;
@@ -598,8 +682,6 @@
 		// any responses yet).  Now send them all at once, followed by a STATUS message
 		// to say that we're finished.
 		osrfAppRequestRespondComplete( ctx->session, ctx->request, ctx->responses );
-		jsonObjectFree(ctx->responses);
-		ctx->responses = NULL;
 
 	} else {
 		// We have no cached responses to return.



More information about the opensrf-commits mailing list