[OpenSRF-GIT] OpenSRF branch master updated. d7e9df6838f1c9a72db3fd41556d178cfe7f6700

Evergreen Git git at git.evergreen-ils.org
Fri Jan 4 14:08:11 EST 2019


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "OpenSRF".

The branch, master has been updated
       via  d7e9df6838f1c9a72db3fd41556d178cfe7f6700 (commit)
       via  efa9b713d5341458a3afaa26d1cf9e750fa78654 (commit)
       via  848843b1cda6d79d3aec589746abf7a8efdc313d (commit)
       via  60bcf74ecc5a5c7d0740d7054e829eef19828091 (commit)
       via  a9da9c7b25ef90276d41ba84ba8ccf0ea856da1b (commit)
       via  1abec0da61a46fa5a4a881c29a2e95a51e9013fa (commit)
       via  0201ca954002eb241d277c3068659bb1f8100bab (commit)
      from  f3eab1715079243d541dc12fd90db005630ffec9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit d7e9df6838f1c9a72db3fd41556d178cfe7f6700
Author: Galen Charlton <gmc at equinoxinitiative.org>
Date:   Wed Dec 12 14:35:56 2018 -0500

    LP#1729610: extend backlog queue to C apps
    
    This patch extends the notion of a backlog queue to C apps and
    offers the same functionality as the Perl side of the patch series:
    
    - max_backlog_queue configuration setting
    - ability to queue messages up to the configured limit
    - ability to drop requests that would overflow the backlog
      queue and send status 503 exceptions back to the client.
    
    This patch also adds a new service, opensrf.cslow, that implements
    a opensrf.cslow.wait method similar to the Perl opensrf.slooooooow
    service.
    
    To test
    -------
    [1] Set a low max_backlog_queue for opensrf.cslow and a low
        max_children.
    [2] Arrange for srfsh to fire off a bunch of opensrf.cslow.wait
        requests.
    [3] Verify that requests that come in after the backlog queue fills
        up immediately get 503 exceptions.
    
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/examples/opensrf.xml.example b/examples/opensrf.xml.example
index 7157787..c4be049 100644
--- a/examples/opensrf.xml.example
+++ b/examples/opensrf.xml.example
@@ -165,6 +165,24 @@ vim:et:ts=2:sw=2:
         </unix_config>
       </opensrf.dbmath>
 
+      <opensrf.cslow>
+        <keepalive>3</keepalive>
+        <stateless>1</stateless>
+        <language>c</language>
+        <implementation>libosrf_cslow.so</implementation>
+        <unix_config>
+          <max_requests>1000</max_requests>
+          <unix_log>opensrf.cslow_unix.log</unix_log>
+          <unix_sock>opensrf.cslow_unix.sock</unix_sock>
+          <unix_pid>opensrf.cslow_unix.pid</unix_pid>
+          <min_children>5</min_children>
+          <max_children>15</max_children>
+          <min_spare_children>2</min_spare_children>
+          <max_spare_children>5</max_spare_children>
+          <max_backlog_queue>10</max_backlog_queue>
+        </unix_config>
+      </opensrf.cslow>
+
       <opensrf.settings>
         <keepalive>1</keepalive>
         <stateless>1</stateless>
@@ -262,6 +280,7 @@ vim:et:ts=2:sw=2:
         <appname>opensrf.dbmath</appname>
         <appname>opensrf.validator</appname>
         <appname>opensrf.slooooooow</appname>
+        <appname>opensrf.cslow</appname>
       </activeapps>
 
       <apps>
diff --git a/src/c-apps/Makefile.am b/src/c-apps/Makefile.am
index 54c3cac..9138ff6 100644
--- a/src/c-apps/Makefile.am
+++ b/src/c-apps/Makefile.am
@@ -18,11 +18,15 @@ AM_LDFLAGS = $(DEF_LDFLAGS) -L at top_builddir@/src/libopensrf
 DISTCLEANFILES = Makefile.in Makefile
 
 noinst_PROGRAMS = timejson
-lib_LTLIBRARIES = libosrf_dbmath.la libosrf_math.la libosrf_version.la
+lib_LTLIBRARIES = libosrf_cslow.la libosrf_dbmath.la libosrf_math.la libosrf_version.la
 
 timejson_SOURCES = timejson.c
 timejson_LDADD = @top_builddir@/src/libopensrf/libopensrf.la
 
+libosrf_cslow_la_SOURCES = osrf_cslow.c
+libosrf_cslow_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
+libosrf_cslow_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
+
 libosrf_dbmath_la_SOURCES = osrf_dbmath.c 
 libosrf_dbmath_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
 libosrf_dbmath_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
diff --git a/src/c-apps/osrf_cslow.c b/src/c-apps/osrf_cslow.c
new file mode 100644
index 0000000..108977a
--- /dev/null
+++ b/src/c-apps/osrf_cslow.c
@@ -0,0 +1,58 @@
+#include <opensrf/osrf_app_session.h>
+#include <opensrf/osrf_application.h>
+#include <opensrf/osrf_json.h>
+#include <opensrf/log.h>
+
+#define MODULENAME "opensrf.cslow"
+
+int osrfAppInitialize();
+int osrfAppChildInit();
+int osrfCSlowWait( osrfMethodContext* );
+
+
+int osrfAppInitialize() {
+
+	osrfAppRegisterMethod( 
+			MODULENAME, 
+			"opensrf.cslow.wait", 
+			"osrfCSlowWait", 
+			"Wait specified number of seconds, then return that number", 1, 0 );
+
+	return 0;
+}
+
+int osrfAppChildInit() {
+	return 0;
+}
+
+int osrfCSlowWait( osrfMethodContext* ctx ) {
+	if( osrfMethodVerifyContext( ctx ) ) {
+		osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
+		return -1;
+	}
+
+	const jsonObject* x = jsonObjectGetIndex(ctx->params, 0);
+
+	if( x ) {
+
+		char* a = jsonObjectToSimpleString(x);
+
+		if( a ) {
+
+			unsigned int pause = atoi(a);
+			sleep(pause);
+
+			jsonObject* resp = jsonNewNumberObject(pause);
+			osrfAppRespondComplete( ctx, resp );
+			jsonObjectFree(resp);
+
+			free(a);
+			return 0;
+		}
+	}
+
+	return -1;
+}
+
+
+
diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c
index f02f635..a9f7c42 100644
--- a/src/libopensrf/osrf_prefork.c
+++ b/src/libopensrf/osrf_prefork.c
@@ -48,6 +48,7 @@ typedef struct {
 	int max_requests;     /**< How many requests a child processes before terminating. */
 	int min_children;     /**< Minimum number of children to maintain. */
 	int max_children;     /**< Maximum number of children to maintain. */
+	int max_backlog_queue; /**< Maximum size of backlog queue. */
 	int fd;               /**< Unused. */
 	int data_to_child;    /**< Unused. */
 	int data_to_parent;   /**< Unused. */
@@ -86,7 +87,7 @@ typedef struct prefork_child_struct prefork_child;
 static volatile sig_atomic_t child_dead;
 
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-	int max_requests, int min_children, int max_children );
+	int max_requests, int min_children, int max_children, int max_backlog_queue );
 static prefork_child* launch_child( prefork_simple* forker );
 static void prefork_launch_children( prefork_simple* forker );
 static void prefork_run( prefork_simple* forker );
@@ -137,6 +138,7 @@ int osrf_prefork_run( const char* appname ) {
 
 	int maxr = 1000;
 	int maxc = 10;
+	int maxbq = 1000;
 	int minc = 3;
 	int kalive = 5;
 
@@ -146,6 +148,7 @@ int osrf_prefork_run( const char* appname ) {
 	char* max_req      = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
 	char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
 	char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
+	char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
 	char* keepalive    = osrf_settings_host_value( "/apps/%s/keepalive", appname );
 
 	if( !keepalive )
@@ -168,10 +171,16 @@ int osrf_prefork_run( const char* appname ) {
 	else
 		maxc = atoi( max_children );
 
+	if( !max_backlog_queue )
+		osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
+	else
+		maxbq = atoi( max_backlog_queue );
+
 	free( keepalive );
 	free( max_req );
 	free( min_children );
 	free( max_children );
+	free( max_backlog_queue );
 	/* --------------------------------------------------- */
 
 	char* resc = va_list_to_string( "%s_listener", appname );
@@ -187,7 +196,7 @@ int osrf_prefork_run( const char* appname ) {
 
 	prefork_simple forker;
 
-	if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
+	if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
 		osrfLogError( OSRF_LOG_MARK,
 			"osrf_prefork_run() failed to create prefork_simple object" );
 		return -1;
@@ -522,10 +531,11 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
 			before terminating.
 	@param min_children Minimum number of child processes to maintain.
 	@param max_children Maximum number of child processes to maintain.
+	@param max_backlog_queue Maximum size of backlog queue.
 	@return 0 if successful, or 1 if not (due to invalid parameters).
 */
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-		int max_requests, int min_children, int max_children ) {
+		int max_requests, int min_children, int max_children, int max_backlog_queue ) {
 
 	if( min_children > max_children ) {
 		osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
@@ -546,6 +556,7 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien
 	prefork->max_requests = max_requests;
 	prefork->min_children = min_children;
 	prefork->max_children = max_children;
+	prefork->max_backlog_queue = max_backlog_queue;
 	prefork->fd           = 0;
 	prefork->data_to_child = 0;
 	prefork->data_to_parent = 0;
@@ -850,6 +861,16 @@ static void prefork_run( prefork_simple* forker ) {
 
 	transport_message* cur_msg = NULL;
 
+	// The backlog queue accumulates messages received while there
+	// are not yet children available to process them. While the
+	// transport client maintains its own queue of messages, sweeping
+	// the transport client's queue in the backlog queue gives us the
+	// ability to set a limit on the size of the backlog queue (and
+	// then to drop messages once the backlog queue has filled up)
+	transport_message* backlog_queue_head = NULL;
+	transport_message* backlog_queue_tail = NULL;
+	int backlog_queue_size = 0;
+
 	while( 1 ) {
 
 		if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
@@ -857,45 +878,98 @@ static void prefork_run( prefork_simple* forker ) {
 			return;
 		}
 
-		// Wait indefinitely for an input message
-		osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-		cur_msg = client_recv( forker->connection, -1 );
+		int received_from_network = 0;
+		if ( backlog_queue_size == 0 ) {
+			// Wait indefinitely for an input message
+			osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+			cur_msg = client_recv( forker->connection, -1 );
+			received_from_network = 1;
+		} else {
+			// See if any messages are immediately available
+			cur_msg = client_recv( forker->connection, 0 );
+			if ( cur_msg != NULL )
+				received_from_network = 1;
+		}
 
-		if( cur_msg == NULL ) {
-			// most likely a signal was received.  clean up any recently
-			// deceased children and try again.
-			if(child_dead)
-				reap_children(forker);
-			continue;
-        }
+		if (received_from_network) {
+			if( cur_msg == NULL ) {
+				// most likely a signal was received.  clean up any recently
+				// deceased children and try again.
+				if(child_dead)
+					reap_children(forker);
+				continue;
+			}
 
-        if (cur_msg->error_type) {
-            osrfLogInfo(OSRF_LOG_MARK, 
-                "Listener received an XMPP error message.  "
-                "Likely a bounced message. sender=%s", cur_msg->sender);
-            if(child_dead)
-                reap_children(forker);
-            continue;
-        }
+			if (cur_msg->error_type) {
+				osrfLogInfo(OSRF_LOG_MARK,
+					"Listener received an XMPP error message.  "
+					"Likely a bounced message. sender=%s", cur_msg->sender);
+				if(child_dead)
+					reap_children(forker);
+				continue;
+			}
 
-		message_prepare_xml( cur_msg );
-		const char* msg_data = cur_msg->msg_xml;
-		if( ! msg_data || ! *msg_data ) {
-			osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
-				(msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
-			message_free( cur_msg );
-			continue;       // Message not usable; go on to the next one.
+			message_prepare_xml( cur_msg );
+			const char* msg_data = cur_msg->msg_xml;
+			if( ! msg_data || ! *msg_data ) {
+				osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+					(msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+				message_free( cur_msg );
+				continue;       // Message not usable; go on to the next one.
+			}
+
+			// stick message onto queue
+			cur_msg->next = NULL;
+			if (backlog_queue_size == 0) {
+				backlog_queue_head = cur_msg;
+				backlog_queue_tail = cur_msg;
+			} else {
+				if (backlog_queue_size >= forker->max_backlog_queue) {
+					osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
+						"latest message",
+						forker->max_backlog_queue );
+					osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
+					osrf_message_set_status_info( err, "osrfMethodException",
+						"Service unavailable: no available children and backlog queue at limit",
+						OSRF_STATUS_SERVICEUNAVAILABLE );
+					char *data = osrf_message_serialize( err );
+					osrfMessageFree( err );
+					transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
+					message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
+					free( data );
+					transport_client* client = osrfSystemGetTransportClient();
+					client_send_message( client, tresponse );
+					message_free( tresponse );
+					message_free(cur_msg);
+					continue;
+				}
+				backlog_queue_tail->next = cur_msg;
+				backlog_queue_tail = cur_msg;
+				osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
+			}
+			backlog_queue_size++;
 		}
 
+		if (backlog_queue_size == 0) {
+			// strictly speaking, this check may be redundant, but
+			// from this point forward we can be sure that the
+			// backlog queue has at least one message in it and
+			// that if we can find a child to process it, we want to
+			// process the head of that queue.
+			continue;
+		}
+
+		cur_msg = backlog_queue_head;
+
 		int honored = 0;     /* will be set to true when we service the request */
 		int no_recheck = 0;
 
 		while( ! honored ) {
 
-            if( !no_recheck ) {
-                if(check_children( forker, 0 ) < 0) {
+			if( !no_recheck ) {
+				if(check_children( forker, 0 ) < 0) {
                     continue; // check failed, try again
-                }
+				}
             }
             no_recheck = 0;
 
@@ -924,6 +998,7 @@ static void prefork_run( prefork_simple* forker ) {
 				osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
 					cur_child->write_data_fd );
 
+				const char* msg_data = cur_msg->msg_xml;
 				int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
 				if( written < 0 ) {
 					// This child appears to be dead or unusable.  Discard it.
@@ -958,6 +1033,7 @@ static void prefork_run( prefork_simple* forker ) {
 						osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
 							new_child->write_data_fd, new_child->pid );
 
+						const char* msg_data = cur_msg->msg_xml;
 						int written = write(
 							new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
 						if( written < 0 ) {
@@ -980,20 +1056,21 @@ static void prefork_run( prefork_simple* forker ) {
                 }
             }
 
-			if( !honored ) {
-				osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
-				if( check_children( forker, 1 ) >= 0 ) {
-				    // Tell the loop not to call check_children again, since we just successfully called it
-				    no_recheck = 1;
-                }
-			}
-
 			if( child_dead )
 				reap_children( forker );
 
+			if( !honored ) {
+				break;
+			}
+
 		} // end while( ! honored )
 
-		message_free( cur_msg );
+		if ( honored ) {
+			backlog_queue_head = cur_msg->next;
+			backlog_queue_size--;
+			cur_msg->next = NULL;
+			message_free( cur_msg );
+		}
 
 	} /* end top level listen loop */
 }

commit efa9b713d5341458a3afaa26d1cf9e750fa78654
Author: Galen Charlton <gmc at equinoxinitiative.org>
Date:   Wed Nov 7 12:03:17 2018 -0500

    LP#1729610: return new OpenSRF status if backlog queue fills up
    
    This patch teaches Perl services how to return a new OpenSRF status,
    OSRF_STATUS_SERVICEUNAVAILABLE (code 503) if the backlog queue
    for a service gets full.
    
    To test
    -------
    [1] Set a low max_backlog_queue for opensrf.sloooow and a low
        max_children.
    [2] Arrange for srfsh to fire off a bunch of opensrf.sloooow.wait
        requests.
    [3] Verify that requests that come in after the backlog queue fills
        up immediately get 503 exceptions.
    
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/include/opensrf/osrf_message.h b/include/opensrf/osrf_message.h
index 1785adf..874c310 100644
--- a/include/opensrf/osrf_message.h
+++ b/include/opensrf/osrf_message.h
@@ -53,6 +53,7 @@ extern "C" {
 
 #define OSRF_STATUS_INTERNALSERVERERROR  500
 #define OSRF_STATUS_NOTIMPLEMENTED       501
+#define OSRF_STATUS_SERVICEUNAVAILABLE   503
 #define OSRF_STATUS_VERSIONNOTSUPPORTED  505
 
 
diff --git a/src/javascript/opensrf.js b/src/javascript/opensrf.js
index e18efa5..03c79a9 100644
--- a/src/javascript/opensrf.js
+++ b/src/javascript/opensrf.js
@@ -58,6 +58,7 @@ var OSRF_STATUS_TIMEOUT = 408;
 var OSRF_STATUS_EXPFAILED = 417;
 var OSRF_STATUS_INTERNALSERVERERROR = 500;
 var OSRF_STATUS_NOTIMPLEMENTED = 501;
+var OSRF_STATUS_SERVICEUNAVAILABLE = 503;
 var OSRF_STATUS_VERSIONNOTSUPPORTED = 505;
 
 // TODO: get path from ./configure prefix
diff --git a/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm b/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm
index 874a4ae..80fc7d2 100644
--- a/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm
+++ b/src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm
@@ -12,6 +12,7 @@ BEGIN {
 					STATUS_BADREQUEST STATUS_UNAUTHORIZED STATUS_FORBIDDEN
 					STATUS_NOTFOUND STATUS_NOTALLOWED STATUS_TIMEOUT
 					STATUS_INTERNALSERVERERROR STATUS_NOTIMPLEMENTED
+					STATUS_SERVICEUNAVAILABLE
 					STATUS_VERSIONNOTSUPPORTED STATUS_REDIRECTED 
 					STATUS_EXPFAILED STATUS_COMPLETE STATUS_PARTIAL
 					STATUS_NOCONTENT/;
@@ -21,6 +22,7 @@ BEGIN {
 					STATUS_BADREQUEST STATUS_UNAUTHORIZED STATUS_FORBIDDEN
 					STATUS_NOTFOUND STATUS_NOTALLOWED STATUS_TIMEOUT
 					STATUS_INTERNALSERVERERROR STATUS_NOTIMPLEMENTED
+					STATUS_SERVICEUNAVAILABLE
 					STATUS_VERSIONNOTSUPPORTED STATUS_REDIRECTED 
 					STATUS_EXPFAILED STATUS_COMPLETE STATUS_PARTIAL
 					STATUS_NOCONTENT/ ],
@@ -72,6 +74,7 @@ sub STATUS_EXPFAILED		{ return 417 }
 
 sub STATUS_INTERNALSERVERERROR	{ return 500 }
 sub STATUS_NOTIMPLEMENTED			{ return 501 }
+sub STATUS_SERVICEUNAVAILABLE	{ return 503 }
 sub STATUS_VERSIONNOTSUPPORTED	{ return 505 }
 
 my $log = 'OpenSRF::Utils::Logger';
diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm
index 0d31fcf..49ede59 100644
--- a/src/perl/lib/OpenSRF/Server.pm
+++ b/src/perl/lib/OpenSRF/Server.pm
@@ -21,6 +21,7 @@ use OpenSRF::Utils::Config;
 use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::Transport::SlimJabber::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
@@ -227,8 +228,47 @@ sub run {
                         }
                     }
                 } else {
-                    # We'll just have to wait
-                    $self->check_status(1); # block until child is available
+
+                    if (!$from_network) {
+                        # The queue is full, and we just requeued a message. We'll
+                        # now see if there is a request available from the network;
+                        # if so, we'll see if a child is available again or else
+                        # drop it
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $self->check_status();
+                            if (@{$self->{idle_list}}) {
+                                # child now available, so we'll go ahead and queue it
+                                $chatty and $logger->debug("server: queuing new message after a re-queue with a full queue");
+                                push @max_children_msg_queue, $msg;
+                            } else {
+                                # ok, need to drop this one
+                                my $resp = OpenSRF::DomainObject::oilsMessage->new();
+                                $resp->type('STATUS');
+                                $resp->payload(
+                                    OpenSRF::DomainObject::oilsMethodException->new(
+                                        status => "Service unavailable: no available children and backlog queue at limit",
+                                        statusCode => STATUS_SERVICEUNAVAILABLE
+                                    )
+                                );
+                                $resp->threadTrace(1);
+
+                                $logger->set_osrf_xid($msg->osrf_xid);
+                                $self->{osrf_handle}->send(
+                                    to => $msg->from,
+                                    osrf_xid => $msg->osrf_xid, # Note that this is ignored, which
+                                                                # is why we called $logger->set_osrf_xid above.
+                                                                # We probably don't want that to be necessary
+                                                                # if osrf_xid is explicitly set here, but that'll
+                                                                # be a FIXME for later
+                                    thread => $msg->thread,
+                                    body => OpenSRF::Utils::JSON->perl2JSON([ $resp ])
+                                );
+                                $logger->warn("Backlog queue full for $self->{service}; forced to drop message " .
+                                              $msg->thread . " from " . $msg->from);
+                            }
+                        }
+                    }
                 }
             }
 
diff --git a/src/python/osrf/const.py b/src/python/osrf/const.py
index d888b75..297b1b3 100644
--- a/src/python/osrf/const.py
+++ b/src/python/osrf/const.py
@@ -63,6 +63,7 @@ OSRF_STATUS_TIMEOUT                  = 408
 OSRF_STATUS_EXPFAILED                = 417
 OSRF_STATUS_INTERNALSERVERERROR      = 500
 OSRF_STATUS_NOTIMPLEMENTED           = 501
+OSRF_STATUS_SERVICEUNAVAILABLE       = 503
 OSRF_STATUS_VERSIONNOTSUPPORTED      = 505
 
 

commit 848843b1cda6d79d3aec589746abf7a8efdc313d
Author: Galen Charlton <gmc at equinoxinitiative.org>
Date:   Tue Nov 6 15:14:55 2018 -0500

    LP#1729610: make it possible to set max_backlog_queue in opensrf.xml
    
    This patch adds a new service configuration option, max_backlog_queue,
    to allow controlling the size of the backlog queue.
    
    If not otherwise specified in opensrf.xml, max_backlog_queue defaults
    to 1000.
    
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/examples/opensrf.xml.example b/examples/opensrf.xml.example
index 2e52c7a..7157787 100644
--- a/examples/opensrf.xml.example
+++ b/examples/opensrf.xml.example
@@ -198,6 +198,7 @@ vim:et:ts=2:sw=2:
           <max_children>5</max_children>
           <min_spare_children>1</min_spare_children>
           <max_spare_children>2</max_spare_children>
+          <max_backlog_queue>10</max_backlog_queue>
         </unix_config>
       </opensrf.slooooooow>
 
diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm
index c9534dc..ece232f 100644
--- a/src/perl/lib/OpenSRF/System.pm
+++ b/src/perl/lib/OpenSRF/System.pm
@@ -113,6 +113,7 @@ sub run_service {
         min_children =>  $getval->(unix_config => 'min_children') || 1,
         min_spare_children =>  $getval->(unix_config => 'min_spare_children'),
         max_spare_children =>  $getval->(unix_config => 'max_spare_children'),
+        max_backlog_queue =>  $getval->(unix_config => 'max_backlog_queue'),
         stderr_log_path => $stderr_path
     );
 

commit 60bcf74ecc5a5c7d0740d7054e829eef19828091
Author: Remington Steed <rjs7 at calvin.edu>
Date:   Wed Jan 31 16:05:52 2018 -0500

    LP#1729610: Fix incorrect param description
    
    Mike confirmed that the code expects "pause" to be an integer. This
    commit changes the description to reflect that, and to mention the
    default value.
    
    Signed-off-by: Remington Steed <rjs7 at calvin.edu>
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/src/perl/lib/OpenSRF/Application/Slooooooow.pm b/src/perl/lib/OpenSRF/Application/Slooooooow.pm
index bf218ce..7942f1e 100644
--- a/src/perl/lib/OpenSRF/Application/Slooooooow.pm
+++ b/src/perl/lib/OpenSRF/Application/Slooooooow.pm
@@ -34,7 +34,7 @@ __PACKAGE__->register_method(
     argc            => 1,
     signature       => {
         params => [
-            {name => "pause", type => "number", desc => "Seconds to sleep, can be fractional"},
+            {name => "pause", type => "int", desc => "Seconds to sleep (integer, default is 1)"},
             {name => "extra", type => "string", desc => "Extra optional parameter used to inflate the payload size"}
         ],
         return => {

commit a9da9c7b25ef90276d41ba84ba8ccf0ea856da1b
Author: Mike Rylander <mrylander at gmail.com>
Date:   Tue Oct 24 16:45:09 2017 -0400

    LP#1729610: Add some debug/internal logging to backlog queue
    
    Signed-off-by: Mike Rylander <mrylander at gmail.com>
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm
index 0eaac40..0d31fcf 100644
--- a/src/perl/lib/OpenSRF/Server.pm
+++ b/src/perl/lib/OpenSRF/Server.pm
@@ -169,6 +169,9 @@ sub run {
         $from_network = $wait_time = -1 if (!$msg);
         $msg ||= $self->{osrf_handle}->process($wait_time);
 
+        !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
+        $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
+
         # we woke up for any reason, reset the wait time to allow
         # for idle maintenance as necessary
         $wait_time = 1;
@@ -198,11 +201,15 @@ sub run {
                     "in the OpenSRF configuration if this message occurs frequently");
 
                 if ($from_network) {
+                    $chatty and $logger->debug("server: queuing new message");
                     push @max_children_msg_queue, $msg;
                 } else {
+                    $chatty and $logger->debug("server: re-queuing old message");
                     unshift @max_children_msg_queue, $msg;
                 }
 
+                $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
+
                 if (@max_children_msg_queue < $self->{max_backlog_queue}) {
                     # We still have room on the queue. Set the wait time to
                     # 1s, waiting for a drone to be freed up and reprocess

commit 1abec0da61a46fa5a4a881c29a2e95a51e9013fa
Author: Mike Rylander <mrylander at gmail.com>
Date:   Wed Nov 1 11:11:08 2017 -0400

    LP#1729610: Add a service useful for testing behavior in slow response conditions
    
    This service, opensrf.slooooooow, offers a opensrf.slooooooow.wait method
    that waits for the number of seconds specified in its sole parameter.
    
    Signed-off-by: Mike Rylander <mrylander at gmail.com>
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/examples/opensrf.xml.example b/examples/opensrf.xml.example
index 7e1dccb..2e52c7a 100644
--- a/examples/opensrf.xml.example
+++ b/examples/opensrf.xml.example
@@ -183,6 +183,24 @@ vim:et:ts=2:sw=2:
         </unix_config>
       </opensrf.settings>
 
+      <opensrf.slooooooow>
+        <keepalive>1</keepalive>
+        <stateless>1</stateless>
+        <language>perl</language>
+        <implementation>OpenSRF::Application::Slooooooow</implementation>
+        <max_requests>100</max_requests>
+        <unix_config>
+          <unix_sock>opensrf.slooooooow_unix.sock</unix_sock>
+          <unix_pid>opensrf.slooooooow_unix.pid</unix_pid>
+          <max_requests>1000</max_requests>
+          <unix_log>opensrf.slooooooow_unix.log</unix_log>
+          <min_children>2</min_children>
+          <max_children>5</max_children>
+          <min_spare_children>1</min_spare_children>
+          <max_spare_children>2</max_spare_children>
+        </unix_config>
+      </opensrf.slooooooow>
+
       <opensrf.validator>
         <keepalive>1</keepalive>
         <stateless>1</stateless>
@@ -242,6 +260,7 @@ vim:et:ts=2:sw=2:
         <appname>opensrf.math</appname>
         <appname>opensrf.dbmath</appname>
         <appname>opensrf.validator</appname>
+        <appname>opensrf.slooooooow</appname>
       </activeapps>
 
       <apps>
diff --git a/src/perl/lib/OpenSRF/Application/Slooooooow.pm b/src/perl/lib/OpenSRF/Application/Slooooooow.pm
new file mode 100644
index 0000000..bf218ce
--- /dev/null
+++ b/src/perl/lib/OpenSRF/Application/Slooooooow.pm
@@ -0,0 +1,48 @@
+package OpenSRF::Application::Slooooooow;
+use base qw/OpenSRF::Application/;
+use OpenSRF::Application;
+
+use OpenSRF::Utils::SettingsClient;
+use OpenSRF::EX qw/:try/;
+use OpenSRF::Utils qw/:common/;
+use OpenSRF::Utils::Logger;
+
+my $log;
+
+sub initialize {
+    $log = 'OpenSRF::Utils::Logger';
+}
+
+sub child_init {}
+
+sub wait_for_it {
+    my $self = shift;
+    my $client = shift;
+    my $pause = shift;
+
+    $pause =~ s/\D//g if (defined $pause);
+    $pause //= 1;
+
+    $log->info("Holding for $pause seconds...");
+    sleep($pause);
+    $log->info("Done waiting, time to return.");
+    return [$pause, @_]
+}
+__PACKAGE__->register_method(
+    api_name        => 'opensrf.slooooooow.wait',
+    method          => 'wait_for_it',
+    argc            => 1,
+    signature       => {
+        params => [
+            {name => "pause", type => "number", desc => "Seconds to sleep, can be fractional"},
+            {name => "extra", type => "string", desc => "Extra optional parameter used to inflate the payload size"}
+        ],
+        return => {
+            desc => "Array of passed parameters",
+            type => "array"
+        }
+    }
+
+);
+
+1;

commit 0201ca954002eb241d277c3068659bb1f8100bab
Author: Mike Rylander <mrylander at gmail.com>
Date:   Tue Oct 24 13:27:37 2017 -0400

    LP#1729610: Allow queuing (for a while) during child backlog
    
    This patch teaches OpenSRF listeners for Perl services how to maintain
    a queue of requests in case no drone process is immediately available
    to process a requeust.
    
    Signed-off-by: Mike Rylander <mrylander at gmail.com>
    Signed-off-by: Galen Charlton <gmc at equinoxinitiative.org>
    Signed-off-by: Bill Erickson <berickxx at gmail.com>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm
index 8efbc4c..0eaac40 100644
--- a/src/perl/lib/OpenSRF/Server.pm
+++ b/src/perl/lib/OpenSRF/Server.pm
@@ -51,6 +51,7 @@ sub new {
         if $self->{stderr_log_path};
 
     $self->{min_spare_children} ||= 0;
+    $self->{max_backlog_queue} ||= 1000;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
         $self->{max_spare_children} and
@@ -153,13 +154,20 @@ sub run {
     $self->register_routers;
     my $wait_time = 1;
 
+    my @max_children_msg_queue;
+
     # main server loop
     while(1) {
+        my $from_network = 0;
 
         $self->check_status;
         $self->{child_died} = 0;
 
-        my $msg = $self->{osrf_handle}->process($wait_time);
+        my $msg = shift(@max_children_msg_queue);
+
+        # no pending message, so wait for the next one forever
+        $from_network = $wait_time = -1 if (!$msg);
+        $msg ||= $self->{osrf_handle}->process($wait_time);
 
         # we woke up for any reason, reset the wait time to allow
         # for idle maintenance as necessary
@@ -188,11 +196,33 @@ sub run {
                 $logger->warn("server: no children available, waiting... consider increasing " .
                     "max_children for this application higher than $self->{max_children} ".
                     "in the OpenSRF configuration if this message occurs frequently");
-                $self->check_status(1); # block until child is available
 
-                my $child = pop(@{$self->{idle_list}});
-                push(@{$self->{active_list}}, $child);
-                $self->write_child($child, $msg);
+                if ($from_network) {
+                    push @max_children_msg_queue, $msg;
+                } else {
+                    unshift @max_children_msg_queue, $msg;
+                }
+
+                if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+                    # We still have room on the queue. Set the wait time to
+                    # 1s, waiting for a drone to be freed up and reprocess
+                    # this (and any other) queued messages.
+                    $wait_time = 1;
+                    if (!$from_network) {
+                        # if we got here, we had retrieved a message from the queue
+                        # but couldn't process it... but also hadn't fetched any
+                        # additional messages from the network. Doing so now,
+                        # as otherwise only one message will ever get queued
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $chatty and $logger->debug("server: queuing new message after a re-queue");
+                            push @max_children_msg_queue, $msg;
+                        }
+                    }
+                } else {
+                    # We'll just have to wait
+                    $self->check_status(1); # block until child is available
+                }
             }
 
         } else {
diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
index 0a84ae1..766df6a 100644
--- a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
+++ b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
@@ -379,9 +379,8 @@ sub flush_socket {
 	my $self = shift;
     return 0 unless $self->connected;
 
-    while ($self->wait(0)) {
-        # TODO remove this log line
-        $logger->info("flushing data from socket...");
+    while (my $excess = $self->wait(0)) {
+        $logger->info("flushing data from socket... $excess");
     }
 
     return $self->connected;

-----------------------------------------------------------------------

Summary of changes:
 examples/opensrf.xml.example                       |   39 +++++
 include/opensrf/osrf_message.h                     |    1 +
 src/c-apps/Makefile.am                             |    6 +-
 src/c-apps/osrf_cslow.c                            |   58 +++++++
 src/javascript/opensrf.js                          |    1 +
 src/libopensrf/osrf_prefork.c                      |  159 +++++++++++++++-----
 src/perl/lib/OpenSRF/Application/Slooooooow.pm     |   48 ++++++
 src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm  |    3 +
 src/perl/lib/OpenSRF/Server.pm                     |   87 ++++++++++-
 src/perl/lib/OpenSRF/System.pm                     |    1 +
 .../lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm |    5 +-
 src/python/osrf/const.py                           |    1 +
 12 files changed, 359 insertions(+), 50 deletions(-)
 create mode 100644 src/c-apps/osrf_cslow.c
 create mode 100644 src/perl/lib/OpenSRF/Application/Slooooooow.pm


hooks/post-receive
-- 
OpenSRF


More information about the opensrf-commits mailing list