[Opensrf-commits] r1988 - in trunk: include/opensrf src/libopensrf (scottmk)
svn at svn.open-ils.org
svn at svn.open-ils.org
Tue Aug 3 23:20:35 EDT 2010
Author: scottmk
Date: 2010-08-03 23:20:33 -0400 (Tue, 03 Aug 2010)
New Revision: 1988
Modified:
trunk/include/opensrf/osrf_application.h
trunk/src/libopensrf/osrf_application.c
Log:
Implement the chunking of OSRF messages. I.e. bundle multiple
OSRF messages into an XMPP message, up to about 10k bytes, so
as to reduce networking overhead.
M include/opensrf/osrf_application.h
M src/libopensrf/osrf_application.c
Modified: trunk/include/opensrf/osrf_application.h
===================================================================
--- trunk/include/opensrf/osrf_application.h 2010-08-03 02:47:00 UTC (rev 1987)
+++ trunk/include/opensrf/osrf_application.h 2010-08-04 03:20:33 UTC (rev 1988)
@@ -4,6 +4,31 @@
/**
@file osrf_application.h
@brief Routines to load and manage shared object libraries.
+
+ Every method of a service is implemented by a C function. In a few cases those
+ functions are generic to all services. In other cases they are loaded and executed from
+ a shared object library that is specific to the application offering the service, A
+ registry maps method names to function names so that we can call the right function.
+
+ Each such function has a similar signature:
+
+ int method_name( osrfMethodContext* ctx );
+
+ The return value is negative in case of an error. A return code of zero implies that
+ the method has already sent the client a STATUS message to say that it is finished.
+ A return code greater than zero implies that the method has not sent such a STATUS
+ message, so we need to do so after the method returns.
+
+ Any arguments passed to the method are bundled together in a jsonObject inside the
+ osrfMethodContext.
+
+ An application's shared object may also implement any or all of three standard functions:
+
+ - int osrfAppInitialize( void ) Called when an application is registered
+ - int osrfAppChildInit( void ) Called when a server drone is spawned
+ - void osrfAppChildExit( void ) Called when a server drone terminates
+
+ osrfAppInitialize() and osrfAppChild return zero if successful, and non-zero if not.
*/
#include <opensrf/utils.h>
@@ -68,18 +93,65 @@
#define OSRF_METHOD_VERIFY_CONTEXT(d) _OSRF_METHOD_VERIFY_CONTEXT(d);
#endif
+/**
+ @name Method options
+ @brief Macros that get OR'd together to form method options.
+*/
+/*@{*/
+/**
+ @brief Marks a method as a system method.
+
+ System methods are implemented by generic functions, called via static linkage. They
+ are not loaded or executed from shared objects.
+*/
#define OSRF_METHOD_SYSTEM 1
+/**
+ @brief Notes that the method may return more than one result.
+
+ For a @em streaming method, we register both an atomic method and a non-atomic method.
+ See also OSRF_METHOD_ATOMIC.
+*/
#define OSRF_METHOD_STREAMING 2
+/**
+ @brief Combines all responses into a single RESULT message.
+
+ For a @em non-atomic method, the server returns each response to the client in a
+ separate RESULT message. It sends a STATUS message at the end to signify the end of the
+ message stream.
+
+ For an @em atomic method, the server buffers all responses until the method returns,
+ and then sends them all at once in a single RESULT message (followed by a STATUS message).
+ Each individual response is encoded as an entry in a JSON array. This buffering is
+ transparent to the function that implements the method.
+
+ Atomic methods incur less networking overhead than non-atomic methods, at the risk of
+ creating excessively large RESULT messages. The HTTP gateway requires the atomic versions
+ of streaming methods because of the stateless nature of the HTTP protocol.
+
+ If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic
+ and a non-atomic method, whose names are identical except that the atomic one carries a
+ suffix of ".atomic".
+*/
#define OSRF_METHOD_ATOMIC 4
+/**
+ @brief Notes that a previous result to the same call may be available in memcache.
+
+ Before calling the registered function, a cachable method checks memcache for a previously
+ determined result for the same call. If no such result is available, it calls the
+ registered function and caches the new result before returning.
+
+ This caching is not currently implemented for C methods.
+*/
#define OSRF_METHOD_CACHABLE 8
+/*@}*/
typedef struct {
- char* name; /**< the method name. */
- char* symbol; /**< the symbol name (function name). */
- char* notes; /**< public method documentation. */
- int argc; /**< how many args this method expects. */
+ char* name; /**< Method name. */
+ char* symbol; /**< Symbol name (function name) within the shared object. */
+ char* notes; /**< Public method documentation. */
+ int argc; /**< The minimum number of arguments for the method. */
//char* paramNotes; /**< Description of the params expected for this method. */
- int options; /**< bitswitches setting various options for this method. */
+ int options; /**< Bit switches setting various options for this method. */
void* userData; /**< Opaque pointer to application-specific data. */
/*
@@ -91,35 +163,15 @@
} osrfMethod;
typedef struct {
- osrfAppSession* session; /**< the current session. */
- osrfMethod* method; /**< the requested method. */
- jsonObject* params; /**< the params to the method. */
- int request; /**< request id. */
- jsonObject* responses; /**< array of cached responses. */
+ osrfAppSession* session; /**< Pointer to the current application session. */
+ osrfMethod* method; /**< Pointer to the requested method. */
+ jsonObject* params; /**< Parameters to the method. */
+ int request; /**< Request id. */
+ jsonObject* responses; /**< Array of cached responses. */
} osrfMethodContext;
-/**
- Register an application
- @param appName The name of the application
- @param soFile The library (.so) file that implements this application
- @return 0 on success, -1 on error
-*/
int osrfAppRegisterApplication( const char* appName, const char* soFile );
-/**
- @brief Register a method for a given application.
-
- @param appName Name of the application that implements the method.
- @param methodName The fully qualified name of the method.
- @param symbolName The symbol name (function name) that implements the method.
- @param notes Public documentation for this method.
- @params argc The number of arguments this method expects.
- @param options Bit switches setting various options.
- @return 0 on success, -1 on error
-
- Any method with the OSRF_METHOD_STREAMING option set will have a ".atomic"
- version of the method registered automatically.
-*/
int osrfAppRegisterMethod( const char* appName, const char* methodName,
const char* symbolName, const char* notes, int argc, int options );
@@ -128,41 +180,19 @@
osrfMethod* _osrfAppFindMethod( const char* appName, const char* methodName );
-/**
- Runs the specified method for the specified application.
- @param appName The name of the application who's method to run
- @param methodName The name of the method to run
- @param ses The app session attached to this request
- @params reqId The request id for this request
- @param params The method parameters
-*/
int osrfAppRunMethod( const char* appName, const char* methodName,
osrfAppSession* ses, int reqId, jsonObject* params );
-/**
- @brief Respond to the client with a method exception.
- @param ses The current session.
- @param request The request id.
- @param msg The debug message to send to the client.
- @return 0 on successfully sending of the message, -1 otherwise.
-*/
int osrfAppRequestRespondException( osrfAppSession* ses, int request, const char* msg, ... );
int osrfAppRespond( osrfMethodContext* context, const jsonObject* data );
+
int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data );
-/* OSRF_METHOD_ATOMIC and/or OSRF_METHOD_CACHABLE and/or 0 for no special options */
-//int osrfAppProcessMethodOptions( char* method );
-
-/** Tell the backend process to run its child init function */
int osrfAppRunChildInit(const char* appname);
void osrfAppRunExitCode( void );
-/**
- Determine whether the context looks healthy.
- Return 0 if it does, or -1 if it doesn't.
-*/
int osrfMethodVerifyContext( osrfMethodContext* ctx );
#ifdef __cplusplus
Modified: trunk/src/libopensrf/osrf_application.c
===================================================================
--- trunk/src/libopensrf/osrf_application.c 2010-08-03 02:47:00 UTC (rev 1987)
+++ trunk/src/libopensrf/osrf_application.c 2010-08-04 03:20:33 UTC (rev 1988)
@@ -48,6 +48,8 @@
#define OSRF_SYSMETHOD_ECHO_ATOMIC "opensrf.system.echo.atomic"
/*@}*/
+#define OSRF_MSG_BUFFER_SIZE 10240
+
/**
@brief Represent an Application.
*/
@@ -238,13 +240,12 @@
The @a options parameter is zero or more of the following macros, OR'd together:
- - OSRF_METHOD_SYSTEM
- - OSRF_METHOD_STREAMING
- - OSRF_METHOD_ATOMIC
- - OSRF_METHOD_CACHABLE
+ - OSRF_METHOD_SYSTEM called by static linkage (shouldn't be used here)
+ - OSRF_METHOD_STREAMING method may return more than one response
+ - OSRF_METHOD_ATOMIC return all responses collected in a single RESULT message
+ - OSRF_METHOD_CACHABLE cache results in memcache
- If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of
- the method.
+ If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
*/
int osrfAppRegisterMethod( const char* appName, const char* methodName,
const char* symbolName, const char* notes, int argc, int options ) {
@@ -325,7 +326,7 @@
methodName = ""; // should never happen
if( options & OSRF_METHOD_ATOMIC ) {
- // Append ".atomic" to the name, and make the method streaming
+ // Append ".atomic" to the name, and make the method atomic
char mb[ strlen( methodName ) + 8 ];
sprintf( mb, "%s.atomic", methodName );
method->name = strdup( mb );
@@ -486,7 +487,11 @@
return osrfAppRequestRespondException(
ses, reqId, "An unknown server error occurred" );
- return _osrfAppPostProcess( &context, retcode );
+ retcode = _osrfAppPostProcess( &context, retcode );
+
+ if( context.responses )
+ jsonObjectFree( context.responses );
+ return retcode;
}
/**
@@ -526,25 +531,64 @@
}
/**
+ @brief Send any response messages that have accumulated in the output buffer.
+ @param ses Pointer to the current application session.
+ @param outbuf Pointer to the output buffer.
+ @return Zero if successful, or -1 if not.
+
+ Used only by servers to respond to clients.
+*/
+static int flush_responses( osrfAppSession* ses, growing_buffer* outbuf ) {
+
+ // Collect any inbound traffic on the socket(s). This doesn't accomplish anything for the
+ // immediate task at hand, but it may help to keep TCP from getting clogged in some cases.
+ osrf_app_session_queue_wait( ses, 0, NULL );
+
+ int rc = 0;
+ if( buffer_length( outbuf ) > 0 ) { // If there's anything to send...
+ buffer_add_char( outbuf, ']' ); // Close the JSON array
+ if( osrfSendTransportPayload( ses, OSRF_BUFFER_C_STR( ses->outbuf ))) {
+ osrfLogError( OSRF_LOG_MARK, "Unable to flush response buffer" );
+ rc = -1;
+ }
+ }
+ buffer_reset( ses->outbuf );
+ return rc;
+}
+
+/**
+ @brief Add a message to an output buffer.
+ @param outbuf Pointer to the output buffer.
+ @param msg Pointer to the message to be added, in the form of a JSON string.
+
+ Since the output buffer is in the form of a JSON array, prepend a left bracket to the
+ first message, and a comma to subsequent ones.
+
+ Used only by servers to respond to clients.
+*/
+static inline void append_msg( growing_buffer* outbuf, const char* msg ) {
+ if( outbuf && msg ) {
+ char prefix = buffer_length( outbuf ) > 0 ? ',' : '[';
+ buffer_add_char( outbuf, prefix );
+ buffer_add( outbuf, msg );
+ }
+}
+
+/**
@brief Either send or enqueue a response to a client, optionally with a completion notice.
@param ctx Pointer to the method context.
@param data Pointer to the response, in the form of a jsonObject.
@param complete Boolean: if true, we will accompany the RESULT message with a STATUS
message indicating that the response is complete.
- @return Zero if successful, or -1 upon error. The only recognized errors are if either
- the @a ctx pointer or its method pointer is NULL.
+ @return Zero if successful, or -1 upon error.
For an atomic method, add a copy of the response data to a cache within the method
context, to be sent later. In this case the @a complete parameter has no effect,
because we'll send the STATUS message later when we send the cached results.
- If the method is cachable but not atomic, do nothing, ignoring the results in @a data.
- Apparently there are no cachable methods at this writing. If we ever invent some, we
- may need to revisit this function.
-
- If the method is neither atomic nor cachable, then send a RESULT message to the client,
- with the results in @a data. If @a complete is true, also send a STATUS message to
- indicate that the response is complete.
+ If the method is not atomic, translate the message into JSON and append it to a buffer,
+ flushing the buffer as needed to avoid overflow. If @a complete is true, append
+ a STATUS message (as JSON) to the buffer and flush the buffer.
*/
static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int complete ) {
if(!(ctx && ctx->method)) return -1;
@@ -560,15 +604,55 @@
// Add a copy of the data object to the cache.
if ( data != NULL )
jsonObjectPush( ctx->responses, jsonObjectClone(data) );
- }
+ } else {
+ osrfLogDebug( OSRF_LOG_MARK,
+ "Adding responses to stash for method %s", ctx->method->name );
- if( !(ctx->method->options & OSRF_METHOD_ATOMIC ) &&
- !(ctx->method->options & OSRF_METHOD_CACHABLE) ) {
+ if( data ) {
+ // If you want to flush the intput buffers for every output message,
+ // this is the place to do it.
+ //osrf_app_session_queue_wait( ctx->session, 0, NULL );
- if(complete)
- osrfAppRequestRespondComplete( ctx->session, ctx->request, data );
- else
- osrfAppRequestRespond( ctx->session, ctx->request, data );
+ // Create an OSRF message
+ osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+ osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+ osrf_message_set_result( msg, data );
+
+ // Serialize the OSRF message into JSON text
+ char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+ osrfMessageFree( msg );
+
+ // If the new message would overflow the buffer, flush the output buffer first
+ int len_so_far = buffer_length( ctx->session->outbuf );
+ if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
+ if( flush_responses( ctx->session, ctx->session->outbuf ))
+ return -1;
+ }
+
+ // Append the JSON text to the output buffer
+ append_msg( ctx->session->outbuf, json );
+ free( json );
+ }
+
+ if(complete) {
+ // Create a STATUS message
+ osrfMessage* status_msg = osrf_message_init( STATUS, ctx->request, 1 );
+ osrf_message_set_status_info( status_msg, "osrfConnectStatus", "Request Complete",
+ OSRF_STATUS_COMPLETE );
+
+ // Serialize the STATUS message into JSON text
+ char* json = jsonObjectToJSON( osrfMessageToJSON( status_msg ));
+ osrfMessageFree( status_msg );
+
+ // Add the STATUS message to the output buffer.
+ // It's short, so don't worry about avoiding overflow.
+ append_msg( ctx->session->outbuf, json );
+ free( json );
+
+ // Flush the output buffer, sending any accumulated messages.
+ if( flush_responses( ctx->session, ctx->session->outbuf ))
+ return -1;
+ }
}
return 0;
@@ -598,8 +682,6 @@
// any responses yet). Now send them all at once, followed by a STATUS message
// to say that we're finished.
osrfAppRequestRespondComplete( ctx->session, ctx->request, ctx->responses );
- jsonObjectFree(ctx->responses);
- ctx->responses = NULL;
} else {
// We have no cached responses to return.
More information about the opensrf-commits
mailing list