[Opensrf-commits] r2016 - in trunk: bin src/perl/lib/OpenSRF src/perl/lib/OpenSRF/Transport/SlimJabber (erickson)

svn at svn.open-ils.org svn at svn.open-ils.org
Tue Aug 31 20:17:13 EDT 2010


Author: erickson
Date: 2010-08-31 20:17:12 -0400 (Tue, 31 Aug 2010)
New Revision: 2016

Added:
   trunk/src/perl/lib/OpenSRF/Server.pm
Removed:
   trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm
   trunk/src/perl/lib/OpenSRF/UnixServer.pm
Modified:
   trunk/bin/opensrf-perl.pl.in
   trunk/src/perl/lib/OpenSRF/Application.pm
   trunk/src/perl/lib/OpenSRF/System.pm
   trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm
   trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
Log:
Replace Net::Server with local pre-forking server

Support max/min children and max/min spare children
For more, see http://libmail.georgialibraries.org/pipermail/open-ils-dev/2010-May/006068.html

Modified: trunk/bin/opensrf-perl.pl.in
===================================================================
--- trunk/bin/opensrf-perl.pl.in	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/bin/opensrf-perl.pl.in	2010-09-01 00:17:12 UTC (rev 2016)
@@ -110,6 +110,7 @@
 # start a specific service
 sub do_start {
     my $service = shift;
+
     if(-e get_pid_file($service)) {
         msg("$service is already running");
         return;
@@ -117,19 +118,9 @@
 
     load_settings() if $service eq 'opensrf.settings';
 
-    my $sclient = OpenSRF::Utils::SettingsClient->new;
-    my $apps = $sclient->config_value("activeapps", "appname");
-    OpenSRF::Transport::PeerHandle->retrieve->disconnect;
-
     if(grep { $_ eq $service } @hosted_services) {
         return unless do_daemon($service);
-        launch_net_server($service);
-        launch_listener($service);
-        $0 = "OpenSRF controller [$service]";
-        while(my $pid = waitpid(-1, 0)) {
-            last if $pid == -1;
-            $logger->debug("Cleaning up Perl $service process $pid");
-        }
+        OpenSRF::System->run_service($service);
     }
 
     msg("$service is not configured to run on $hostname");
@@ -169,6 +160,9 @@
     close STDIN;
     close STDOUT;
     close STDERR;
+    open STDIN, '</dev/null';
+    open STDOUT, '>/dev/null';
+    open STDERR, '>/dev/null';
     `echo $$ > $pid_file`;
     return 1;
 }
@@ -184,29 +178,6 @@
         $parser->get_server_config($conf->env->hostname);
 }
 
-# starts up the unix::server master process
-sub launch_net_server {
-    my $service = shift;
-    push @OpenSRF::UnixServer::ISA, 'Net::Server::PreFork';
-    unless(OpenSRF::Utils::safe_fork()) {
-        $0 = "OpenSRF Drone [$service]";
-        OpenSRF::UnixServer->new($service)->serve;
-        exit;
-    }
-    return 1;
-}
-
-# starts up the inbound listener process
-sub launch_listener {
-    my $service = shift;
-    unless(OpenSRF::Utils::safe_fork()) {
-        $0 = "OpenSRF listener [$service]";
-        OpenSRF::Transport::Listener->new($service)->initialize->listen;
-        exit;
-    }
-    return 1;
-}
-
 sub msg {
     my $m = shift;
     my $v = shift;

Modified: trunk/src/perl/lib/OpenSRF/Application.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Application.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Application.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -12,7 +12,6 @@
 use OpenSRF::EX qw/:try/;
 use Carp;
 use OpenSRF::Utils::JSON;
-#use OpenSRF::UnixServer;  # to get the server class from UnixServer::App
 
 sub DESTROY{};
 

Added: trunk/src/perl/lib/OpenSRF/Server.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Server.pm	                        (rev 0)
+++ trunk/src/perl/lib/OpenSRF/Server.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -0,0 +1,596 @@
+# ----------------------------------------------------------------
+# Copyright (C) 2010 Equinox Software, Inc.
+# Bill Erickson <erickson at esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# ----------------------------------------------------------------
+package OpenSRF::Server;
+use strict;
+use warnings;
+use OpenSRF::Transport;
+use OpenSRF::Application;
+use OpenSRF::Utils::Config;
+use OpenSRF::Transport::PeerHandle;
+use OpenSRF::Utils::SettingsClient;
+use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::Transport::SlimJabber::Client;
+use POSIX qw/:sys_wait_h :errno_h/;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use IO::Select;
+use Socket;
+our $chatty = 1; # disable for production
+
+use constant STATUS_PIPE_DATA_SIZE => 12;
+
+sub new {
+    my($class, $service, %args) = @_;
+    my $self = bless(\%args, $class);
+
+    $self->{service}         = $service; # service name
+    $self->{num_children}    = 0; # number of child processes
+    $self->{osrf_handle}     = undef; # xmpp handle
+    $self->{routers}         = []; # list of registered routers
+    $self->{active_list}     = []; # list of active children
+    $self->{idle_list}       = []; # list of idle children
+
+    $self->{min_spare_children} ||= 0;
+
+    $self->{max_spare_children} = $self->{min_spare_children} + 1 if
+        $self->{max_spare_children} and
+        $self->{max_spare_children} <= $self->{min_spare_children};
+
+    return $self;
+}
+
+# ----------------------------------------------------------------
+# Disconnects from routers and waits for child processes to exit.
+# ----------------------------------------------------------------
+sub cleanup {
+    my $self = shift;
+    my $no_exit = shift;
+
+    $logger->info("server: shutting down and cleaning up...");
+
+    # don't get sidetracked by signals while we're cleaning up.
+    # it could result in unexpected behavior with list traversal
+    $SIG{CHLD} = 'IGNORE';
+
+    # terminate the child processes
+    $self->kill_child($_) for
+        (@{$self->{idle_list}}, @{$self->{active_list}});
+
+    # de-register routers
+    $self->unregister_routers;
+
+    $self->{osrf_handle}->disconnect;
+
+    # clean up our dead children
+    $self->reap_children(1);
+
+    exit(0) unless $no_exit;
+}
+
+
+# ----------------------------------------------------------------
+# Waits on the jabber socket for inbound data from the router.
+# Each new message is passed off to a child process for handling.
+# At regular intervals, wake up for min/max spare child maintenance
+# ----------------------------------------------------------------
+sub run {
+    my $self = shift;
+
+	$logger->set_service($self->{service});
+
+    $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
+    $SIG{CHLD} = sub { $self->reap_children(); };
+
+    $self->spawn_children;
+    $self->build_osrf_handle;
+    $self->register_routers;
+    my $wait_time = 1;
+
+    # main server loop
+    while(1) {
+
+        $self->check_status;
+        $self->{child_died} = 0;
+
+        my $msg = $self->{osrf_handle}->process($wait_time);
+
+        # we woke up for any reason, reset the wait time to allow
+        # for idle maintenance as necessary
+        $wait_time = 1;
+
+        if($msg) {
+
+            if(my $child = pop(@{$self->{idle_list}})) {
+
+                # we have an idle child to handle the request
+                $chatty and $logger->internal("server: passing request to idle child $child");
+                push(@{$self->{active_list}}, $child);
+                $self->write_child($child, $msg);
+
+            } elsif($self->{num_children} < $self->{max_children}) {
+
+                # spawning a child to handle the request
+                $chatty and $logger->internal("server: spawning child to handle request");
+                $self->write_child($self->spawn_child(1), $msg);
+
+            } else {
+
+                $logger->warn("server: no children available, waiting...");
+                $self->check_status(1); # block until child is available
+
+                my $child = pop(@{$self->{idle_list}});
+                push(@{$self->{active_list}}, $child);
+                $self->write_child($child, $msg);
+            }
+
+        } else {
+
+            # don't perform idle maint immediately when woken by SIGCHLD
+            unless($self->{child_died}) {
+
+                # when we hit equilibrium, there's no need for regular
+                # maintenance, so set wait_time to 'forever'
+                $wait_time = -1 unless $self->perform_idle_maintenance;
+            }
+        }
+    }
+}
+
+# ----------------------------------------------------------------
+# Launch a new spare child or kill an extra spare child.  To
+# prevent large-scale spawning or die-offs, spawn or kill only
+# 1 process per idle maintenance loop.
+# Returns true if any idle maintenance occurred, 0 otherwise
+# ----------------------------------------------------------------
+sub perform_idle_maintenance {
+    my $self = shift;
+
+    # spawn 1 spare child per maintenance loop if necessary
+    if( $self->{min_spare_children} and
+        $self->{num_children} < $self->{max_children} and
+        scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
+
+        $chatty and $logger->internal("server: spawning spare child");
+        $self->spawn_child;
+        return 1;
+
+    # kill 1 excess spare child per maintenance loop if necessary
+    } elsif($self->{max_spare_children} and
+            $self->{num_children} > $self->{min_children} and
+            scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
+
+        $chatty and $logger->internal("server: killing spare child");
+        $self->kill_child;
+        return 1;
+    }
+
+    return 0;
+}
+
+sub kill_child {
+    my $self = shift;
+    my $child = shift || pop(@{$self->{idle_list}}) or return;
+    $chatty and $logger->internal("server: killing child $child");
+    kill('TERM', $child->{pid});
+}
+
+# ----------------------------------------------------------------
+# Jabber connection inbound message arrive on.
+# ----------------------------------------------------------------
+sub build_osrf_handle {
+    my $self = shift;
+
+    my $conf = OpenSRF::Utils::Config->current;
+    my $username = $conf->bootstrap->username;
+    my $password = $conf->bootstrap->passwd;
+    my $domain = $conf->bootstrap->domain;
+    my $port = $conf->bootstrap->port;
+    my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
+
+    $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
+
+    $self->{osrf_handle} =
+        OpenSRF::Transport::SlimJabber::Client->new(
+            username => $username,
+            resource => $resource,
+            password => $password,
+            host => $domain,
+            port => $port,
+        );
+
+    $self->{osrf_handle}->initialize;
+}
+
+
+# ----------------------------------------------------------------
+# Sends request data to a child process
+# ----------------------------------------------------------------
+sub write_child {
+    my($self, $child, $msg) = @_;
+    my $xml = $msg->to_xml;
+    syswrite($child->{pipe_to_child}, $xml);
+}
+
+# ----------------------------------------------------------------
+# Checks to see if any child process has reported its availability
+# In blocking mode, blocks until a child has reported.
+# ----------------------------------------------------------------
+sub check_status {
+    my($self, $block) = @_;
+
+    my $read_set = IO::Select->new;
+    $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
+
+    my @handles = $read_set->can_read(($block) ? undef : 0) or return;
+
+    my $pid = '';
+    my @pids;
+    for my $pipe (@handles) {
+        sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
+        push(@pids, int($pid));
+    }
+
+    $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
+
+    my $child;
+    my @new_actives;
+
+    # move the children from the active list to the idle list
+    for my $proc (@{$self->{active_list}}) {
+        if(grep { $_ == $proc->{pid} } @pids) {
+            push(@{$self->{idle_list}}, $proc);
+        } else {
+            push(@new_actives, $proc);
+        }
+    }
+
+    $self->{active_list} = [@new_actives];
+
+    $chatty and $logger->internal(sprintf(
+        "server: %d idle and %d active children after status update",
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+}
+
+# ----------------------------------------------------------------
+# Cleans up any child processes that have exited.
+# In shutdown mode, block until all children have washed ashore
+# ----------------------------------------------------------------
+sub reap_children {
+    my($self, $shutdown) = @_;
+    $self->{child_died} = 1;
+
+    while(1) {
+
+        my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
+        return if $pid <= 0;
+
+        $chatty and $logger->internal("server: reaping child $pid");
+
+        my ($child) = grep {$_->{pid} == $pid} (@{$self->{active_list}}, @{$self->{idle_list}});
+
+        close($child->{pipe_to_parent});
+        close($child->{pipe_to_child});
+        delete $child->{$_} for keys %$child; # destroy with a vengeance
+
+        $self->{num_children}--;
+        $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
+        $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
+    }
+
+    $self->spawn_children unless $shutdown;
+
+    $chatty and $logger->internal(sprintf(
+        "server: %d idle and %d active children after reap_children",
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+
+}
+
+# ----------------------------------------------------------------
+# Spawn up to max_children processes
+# ----------------------------------------------------------------
+sub spawn_children {
+    my $self = shift;
+    $self->spawn_child while $self->{num_children} < $self->{min_children};
+}
+
+# ----------------------------------------------------------------
+# Spawns a new child.  If $active is set, the child goes directly
+# into the active_list.
+# ----------------------------------------------------------------
+sub spawn_child {
+    my($self, $active) = @_;
+
+    my $child = OpenSRF::Server::Child->new($self);
+
+    # socket for sending message data to the child
+    if(!socketpair(
+        $child->{pipe_to_child},
+        $child->{pipe_to_parent},
+        AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
+            $logger->error("server: error creating data socketpair: $!");
+            return undef;
+    }
+
+    $child->{pipe_to_child}->autoflush(1);
+    $child->{pipe_to_parent}->autoflush(1);
+
+    $child->{pid} = fork();
+
+    if($child->{pid}) { # parent process
+        $self->{num_children}++;
+
+
+        if($active) {
+            push(@{$self->{active_list}}, $child);
+        } else {
+            push(@{$self->{idle_list}}, $child);
+        }
+
+        $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
+
+        return $child;
+
+    } else { # child process
+
+        $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
+
+        $child->{pid} = $$;
+        eval {
+            $child->init;
+            $child->run;
+            OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+        };
+        $logger->error("server: child process died: $@") if $@;
+        exit(0);
+    }
+}
+
+# ----------------------------------------------------------------
+# Sends the register command to the configured routers
+# ----------------------------------------------------------------
+sub register_routers {
+    my $self = shift;
+
+    my $conf = OpenSRF::Utils::Config->current;
+    my $routers = $conf->bootstrap->routers;
+    my $router_name = $conf->bootstrap->router_name;
+    my @targets;
+
+    for my $router (@$routers) {
+        if(ref $router) {
+
+            if( !$router->{services} ||
+                !$router->{services}->{service} ||
+                (
+                    ref($router->{services}->{service}) eq 'ARRAY' and
+                    grep { $_ eq $self->{service} } @{$router->{services}->{service}}
+                )  || $router->{services}->{service} eq $self->{service}) {
+
+                my $name = $router->{name};
+                my $domain = $router->{domain};
+                push(@targets, "$name\@$domain/router");
+            }
+
+        } else {
+            push(@targets, "$router_name\@$router/router");
+        }
+    }
+
+    foreach (@targets) {
+        $logger->info("server: registering with router $_");
+        $self->{osrf_handle}->send(
+            to => $_,
+            body => 'registering',
+            router_command => 'register',
+            router_class => $self->{service}
+        );
+    }
+
+    $self->{routers} = \@targets;
+}
+
+# ----------------------------------------------------------------
+# Sends the unregister command to any routers we have registered
+# with.
+# ----------------------------------------------------------------
+sub unregister_routers {
+    my $self = shift;
+    return unless $self->{osrf_handle}->tcp_connected;
+
+	for my $router (@{$self->{routers}}) {
+        $logger->info("server: disconnecting from router $router");
+        $self->{osrf_handle}->send(
+            to => $router,
+            body => "unregistering",
+            router_command => "unregister",
+            router_class => $self->{service}
+        );
+    }
+}
+
+
+package OpenSRF::Server::Child;
+use strict;
+use warnings;
+use OpenSRF::Transport;
+use OpenSRF::Application;
+use OpenSRF::Transport::PeerHandle;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw(time);
+use POSIX qw/:sys_wait_h :errno_h/;
+
+use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
+
+sub new {
+    my($class, $parent) = @_;
+    my $self = bless({}, $class);
+    $self->{pid} = 0; # my process ID
+    $self->{parent} = $parent; # Controller parent process
+    $self->{num_requests} = 0; # total serviced requests
+    return $self;
+}
+
+sub set_nonblock {
+    my($self, $fh) = @_;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+sub set_block {
+    my($self, $fh) = @_;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    $flags &= ~O_NONBLOCK;
+    fcntl($fh, F_SETFL, $flags);
+}
+
+# ----------------------------------------------------------------
+# Connects to Jabber and runs the application child_init
+# ----------------------------------------------------------------
+sub init {
+    my $self = shift;
+    my $service = $self->{parent}->{service};
+    $0 = "OpenSRF Drone [$service]";
+    OpenSRF::Transport::PeerHandle->construct($service);
+	OpenSRF::Application->application_implementation->child_init
+		if (OpenSRF::Application->application_implementation->can('child_init'));
+}
+
+# ----------------------------------------------------------------
+# Waits for messages from the parent process, handles the message,
+# then goes into the keepalive loop if this is a stateful session.
+# When max_requests is hit, the process exits.
+# ----------------------------------------------------------------
+sub run {
+    my $self = shift;
+    my $network = OpenSRF::Transport::PeerHandle->retrieve;
+
+    # main child run loop.  Ends when this child hits max requests.
+    while(1) {
+
+        my $data = $self->wait_for_request or next;
+
+        # Update process name to show activity
+        my $orig_name = $0;
+        $0 = "$0*";
+
+        # Discard extraneous data from the jabber socket
+        if(!$network->flush_socket()) {
+            $logger->error("server: network disconnected!  child dropping request and exiting: $data");
+            exit;
+        }
+
+        my $session = OpenSRF::Transport->handler(
+            $self->{parent}->{service},
+            OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+        );
+
+        $self->keepalive_loop($session);
+
+        last if ++$self->{num_requests} == $self->{parent}->{max_requests};
+
+        # Tell the parent process we are available to process requests
+        $self->send_status;
+
+        # Repair process name
+        $0 = $orig_name;
+    }
+
+    $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
+
+	OpenSRF::Application->application_implementation->child_exit
+		if (OpenSRF::Application->application_implementation->can('child_exit'));
+}
+
+# ----------------------------------------------------------------
+# waits for a request data on the parent pipe and returns it.
+# ----------------------------------------------------------------
+sub wait_for_request {
+    my $self = shift;
+
+    my $data = '';
+    my $read_size = 1024;
+    my $nonblock = 0;
+
+    while(1) {
+        # Start out blocking, when data is available, read it all
+
+        my $buf = '';
+        my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+
+        unless(defined $n) {
+            $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
+            last;
+        }
+
+        last if $n <= 0; # no data left to read
+
+        $data .= $buf;
+
+        last if $n < $read_size; # done reading all data
+
+        $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
+        $nonblock = 1;
+    }
+
+    $self->set_block($self->{pipe_to_parent}) if $nonblock;
+    return $data;
+}
+
+
+# ----------------------------------------------------------------
+# If this is a stateful opensrf session, wait up to $keepalive
+# seconds for subsequent requests from the client
+# ----------------------------------------------------------------
+sub keepalive_loop {
+    my($self, $session) = @_;
+    my $keepalive = $self->{parent}->{keepalive};
+
+    while($session->state and $session->state == $session->CONNECTED) {
+
+        unless( $session->queue_wait($keepalive) ) {
+
+            # client failed to disconnect before timeout
+            $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
+
+            my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
+                status => "Disconnected on timeout",
+                statusCode => STATUS_TIMEOUT
+            );
+
+            $session->status($res);
+            $session->state($session->DISCONNECTED);
+            last;
+        }
+    }
+
+    $chatty and $logger->internal("server: child done with request(s)");
+    $session->kill_me;
+}
+
+# ----------------------------------------------------------------
+# Report our availability to our parent process
+# ----------------------------------------------------------------
+sub send_status {
+    my $self = shift;
+    syswrite(
+        $self->{pipe_to_parent},
+        sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
+    );
+}
+
+
+1;

Modified: trunk/src/perl/lib/OpenSRF/System.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/System.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/System.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -2,10 +2,9 @@
 use strict; use warnings;
 use OpenSRF;
 use base 'OpenSRF';
-use OpenSRF::Utils::Logger qw(:level);
+use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::Transport::Listener;
 use OpenSRF::Transport;
-use OpenSRF::UnixServer;
 use OpenSRF::Utils;
 use OpenSRF::EX qw/:try/;
 use POSIX qw/setsid :sys_wait_h/;
@@ -13,7 +12,7 @@
 use OpenSRF::Utils::SettingsParser;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Application;
-use Net::Server::PreFork;
+use OpenSRF::Server;
 
 my $bootstrap_config_file;
 sub import {
@@ -67,4 +66,47 @@
 	return 0;
 }
 
+sub run_service {
+    my($class, $service) = @_;
+
+    $0 = "OpenSRF Listener [$service]";
+
+    # temp connection to use for application initialization
+    OpenSRF::System->bootstrap_client(client_name => "system_client");
+
+    my $sclient = OpenSRF::Utils::SettingsClient->new;
+    my $getval = sub { $sclient->config_value(apps => $service => @_); };
+
+    my $impl = $getval->('implementation');
+
+    OpenSRF::Application::server_class($service);
+    OpenSRF::Application->application_implementation($impl);
+    OpenSRF::Utils::JSON->register_class_hint(name => $impl, hint => $service, type => 'hash');
+    OpenSRF::Application->application_implementation->initialize()
+        if (OpenSRF::Application->application_implementation->can('initialize'));
+
+    # kill the temp connection
+    OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+
+    my $server = OpenSRF::Server->new(
+        $service,
+        keepalive => $getval->('keepalive') || 5,
+        max_requests =>  $getval->(unix_config => 'max_requests') || 10000,
+        max_children =>  $getval->(unix_config => 'max_children') || 20,
+        min_children =>  $getval->(unix_config => 'min_children') || 1,
+        min_spare_children =>  $getval->(unix_config => 'min_spare_children'),
+        max_spare_children =>  $getval->(unix_config => 'max_spare_children')
+    );
+
+    while(1) {
+        eval { $server->run; };
+        # we only arrive here if the server died a painful death
+        $logger->error("server: died with error $@");
+        $server->cleanup(1);
+        $logger->info("server: restarting after fatal crash...");
+        sleep 2;
+    }
+}
+
+
 1;

Modified: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -8,13 +8,8 @@
 use OpenSRF::Utils::Logger qw/$logger/;
 use OpenSRF::Transport::SlimJabber::XMPPReader;
 use OpenSRF::Transport::SlimJabber::XMPPMessage;
-use IO::Socket::UNIX;
-use FreezeThaw qw/freeze/;
+use IO::Socket::INET;
 
-sub DESTROY{
-    shift()->disconnect;
-}
-
 =head1 NAME
 
 OpenSRF::Transport::SlimJabber::Client
@@ -153,6 +148,7 @@
 	    unless ( $self->reader->connected );
 
     $self->xmpp_id("$username\@$host/$resource");
+    $logger->debug("Created XMPP connection " . $self->xmpp_id);
 	return $self;
 }
 

Deleted: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -1,201 +0,0 @@
-package OpenSRF::Transport::SlimJabber::Inbound;
-use strict;use warnings;
-use base qw/OpenSRF::Transport::SlimJabber::Client/;
-use OpenSRF::EX qw(:try);
-use OpenSRF::Utils::Logger qw(:level);
-use OpenSRF::Utils::SettingsClient;
-use OpenSRF::Utils::Config;
-use Time::HiRes qw/usleep/;
-use FreezeThaw qw/freeze/;
-
-my $logger = "OpenSRF::Utils::Logger";
-
-=head1 Description
-
-This is the jabber connection where all incoming client requests will be accepted.
-This connection takes the data, passes it off to the system then returns to take
-more data.  Connection params are all taken from the config file and the values
-retreived are based on the $app name passed into new().
-
-This service should be loaded at system startup.
-
-=cut
-
-{
-	my $unix_sock;
-	sub unix_sock { return $unix_sock; }
-	my $instance;
-
-	sub new {
-		my( $class, $app ) = @_;
-		$class = ref( $class ) || $class;
-		if( ! $instance ) {
-
-			my $conf = OpenSRF::Utils::Config->current;
-			my $domain = $conf->bootstrap->domain;
-            $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
-			my $username	= $conf->bootstrap->username;
-			my $password	= $conf->bootstrap->passwd;
-			my $port			= $conf->bootstrap->port;
-			my $host			= $domain;
-			my $resource	= $app . '_listener_at_' . $conf->env->hostname;
-
-            my $no_router = 0; # make this a config entry if we want to use it
-			if($no_router) { 
-			    # no router, only one listener running..
-				$username = "router";
-				$resource = $app; 
-			}
-
-			OpenSRF::Utils::Logger->transport("Inbound as $username, $password, $resource, $host, $port\n", INTERNAL );
-
-			my $self = __PACKAGE__->SUPER::new( 
-					username		=> $username,
-					resource		=> $resource,
-					password		=> $password,
-					host			=> $host,
-					port			=> $port,
-					);
-
-			$self->{app} = $app;
-					
-			my $client = OpenSRF::Utils::SettingsClient->new();
-			my $f = $client->config_value("dirs", "sock");
-			$unix_sock = join( "/", $f, 
-					$client->config_value("apps", $app, "unix_config", "unix_sock" ));
-			bless( $self, $class );
-			$instance = $self;
-		}
-		return $instance;
-	}
-
-}
-
-sub DESTROY {
-	my $self = shift;
-	for my $router (@{$self->{routers}}) {
-		if($self->tcp_connected()) {
-            $logger->info("disconnecting from router $router");
-			$self->send( to => $router, body => "registering", 
-				router_command => "unregister" , router_class => $self->{app} );
-		}
-	}
-}
-
-my $sig_pipe = 0;
-	
-sub listen {
-	my $self = shift;
-	
-    $self->{routers} = [];
-
-	try {
-
-		my $conf = OpenSRF::Utils::Config->current;
-        my $router_name = $conf->bootstrap->router_name;
-		my $routers = $conf->bootstrap->routers;
-        $logger->info("loading router info $routers");
-
-        for my $router (@$routers) {
-            if(ref $router) {
-                if( !$router->{services} || 
-                    !$router->{services}->{service} || 
-                    ( 
-                        ref($router->{services}->{service}) eq 'ARRAY' and 
-                        grep { $_ eq $self->{app} } @{$router->{services}->{service}} )  ||
-                    $router->{services}->{service} eq $self->{app}) {
-
-                    my $name = $router->{name};
-                    my $domain = $router->{domain};
-                    my $target = "$name\@$domain/router";
-                    push(@{$self->{routers}}, $target);
-                    $logger->info( $self->{app} . " connecting to router $target");
-                    $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} );
-                }
-            } else {
-                my $target = "$router_name\@$router/router";
-                push(@{$self->{routers}}, $target);
-                $logger->info( $self->{app} . " connecting to router $target");
-                $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} );
-            }
-        }
-		
-	} catch Error with {
-        my $err = shift;
-		$logger->error($self->{app} . ": No routers defined: $err");
-		# no routers defined
-	};
-
-    my $app = $self->{app};
-
-	$logger->info("$app inbound: going into listen loop" );
-
-	while(1) {
-	
-		my $sock = $self->unix_sock();
-		my $o;
-
-		try {
-			$o = $self->process(-1);
-
-			if(!$o){
-				$logger->error("$app inbound: received no data from the Jabber socket in process()");
-				usleep(100000); # otherwise we loop and pound syslog logger with errors
-			}
-
-		} catch OpenSRF::EX::JabberDisconnected with {
-
-			$logger->error("$app inbound: process lost its jabber connection.  Attempting to reconnect...");
-			$self->initialize;
-			$o = undef;
-		};
-
-        next unless $o;
-
-        while(1) {
-            # keep trying to deliver the message until we succeed
-
-            my $socket = IO::Socket::UNIX->new( Peer => $sock  );
-
-            unless($socket and $socket->connected) {
-                $logger->error("$app inbound: unable to connect to inbound socket $sock: $!");
-                usleep(50000); # 50 msec
-                next;
-            }
-
-            # block until the pipe is ready for writing
-            my $outfile = ''; 
-            vec($outfile, $socket->fileno, 1) = 1;
-            my $nfound = select(undef, $outfile, undef, undef);
-
-            next unless $nfound; # should not happen since we're blocking
-
-            if($nfound == -1) { # select failed
-                $logger->error("$app inbound: unable to write to socket: $!");
-                usleep(50000); # 50 msec
-                next;
-            }
-
-            $sig_pipe = 0;
-            local $SIG{'PIPE'} = sub { $sig_pipe = 1; };
-            print $socket freeze($o);
-
-            if($sig_pipe) {
-                # The attempt to write to the socket failed.  Wait a short time then try again.
-                # Don't bother closing the socket, it will only cause grief
-                $logger->error("$app inbound: got SIGPIPE, will retry after a short wait..."); 
-                usleep(50000); # 50 msec
-                next;
-            } 
-                
-            $socket->close;
-            last;
-        }
-	}
-
-    $logger->error("$app inbound: exited process loop");
-}
-
-1;
-

Modified: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -60,8 +60,6 @@
 	OpenSRF::EX::Config->throw( "JPeer could not load all necessary values from config" )
 		unless ( $username and $password and $resource and $host and $port );
 
-	OpenSRF::Utils::Logger->transport( "Built Peer with", INTERNAL );
-
 	my $self = __PACKAGE__->SUPER::new( 
 		username		=> $username,
 		resource		=> $resource,

Deleted: trunk/src/perl/lib/OpenSRF/UnixServer.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/UnixServer.pm	2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/UnixServer.pm	2010-09-01 00:17:12 UTC (rev 2016)
@@ -1,266 +0,0 @@
-package OpenSRF::UnixServer;
-use strict; use warnings;
-use base qw/OpenSRF/;
-use OpenSRF::EX qw(:try);
-use OpenSRF::Utils::Logger qw(:level $logger);
-use OpenSRF::Transport::PeerHandle;
-use OpenSRF::Application;
-use OpenSRF::AppSession;
-use OpenSRF::DomainObject::oilsResponse qw/:status/;
-use OpenSRF::System;
-use OpenSRF::Utils::SettingsClient;
-use Time::HiRes qw(time);
-use OpenSRF::Utils::JSON;
-use vars qw/@ISA $app/;
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Carp;
-use FreezeThaw qw/thaw/;
-
-use IO::Socket::INET;
-use IO::Socket::UNIX;
-
-sub DESTROY { confess "Dying $$"; }
-
-=head1 What am I
-
-All inbound messages are passed on to the UnixServer for processing.
-We take the data, close the Unix socket, and pass the data on to our abstract
-'process()' method.  
-
-Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
-So when you pass data down the Unix socket to us, we have been preforked and waiting
-to disperse new data among us.
-
-=cut
-
-sub app { return $app; }
-
-{
-
-	sub new {
-		my( $class, $app1 ) = @_;
-		if( ! $app1 ) {
-			throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
-		}
-		$app = $app1;
-		my $self = bless( {}, $class );
-#		my $client = OpenSRF::Utils::SettingsClient->new();
-#		if( $client->config_value("server_type") !~ /fork/i || 
-#				OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
-#			warn "Calling hooks for non-prefork\n";
-#			$self->configure_hook();
-#			$self->child_init_hook();
-#		}
-		return $self;
-	}
-
-}
-
-=head2 process_request()
-
-Takes the incoming data, closes the Unix socket and hands the data untouched 
-to the abstract process() method.  This method is implemented in our subclasses.
-
-=cut
-
-sub process_request {
-
-	my $self = shift;
-	my $data; my $d;
-	while( $d = <STDIN> ) { $data .= $d; }
-
-	my $orig = $0;
-	$0 = "$0*";
-
-	if( ! $data or ! defined( $data ) or $data eq "" ) {
-		close($self->{server}->{client}); 
-		$logger->debug("Unix child received empty data from socket", ERROR);
-		$0 = $orig;
-		return;
-	}
-
-
-	if( ! close( $self->{server}->{client} ) ) {
-		$logger->debug( "Error closing Unix socket: $!", ERROR );
-	}
-
-	my $app = $self->app();
-	$logger->transport( "UnixServer for $app received $data", INTERNAL );
-
-	# --------------------------------------------------------------
-	# Drop all data from the socket before coninuting to process
-	# --------------------------------------------------------------
-	my $ph = OpenSRF::Transport::PeerHandle->retrieve;
-	if(!$ph->flush_socket()) {
-		$logger->error("We received a request ".
-			"and we are no longer connected to the jabber network. ".
-			"We will go away and drop this request: $data");
-		exit;
-	}
-
-    ($data) = thaw($data);
-	my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
-
-	if(!ref($app_session)) {
-		$logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
-		$0 = $orig;
-		return;
-	}
-
-	if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
-		$logger->debug("Exiting keepalive for stateless session / orig = $orig");
-		$app_session->kill_me;
-		$0 = $orig;
-		return;
-	}
-
-
-	my $client = OpenSRF::Utils::SettingsClient->new();
-	my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
-
-	my $req_counter = 0;
-	while( $app_session and 
-			$app_session->state and 
-			$app_session->state != $app_session->DISCONNECTED() and
-			$app_session->find( $app_session->session_id ) ) {
-		
-
-		my $before = time;
-		$logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
-		$app_session->queue_wait( $keepalive );
-		$logger->debug( "after queue wait $keepalive", INTERNAL );
-		my $after = time;
-
-		if( ($after - $before) >= $keepalive ) { 
-
-			my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
-									status => "Disconnected on timeout",
-									statusCode => STATUS_TIMEOUT);
-			$app_session->status($res);
-			$app_session->state( $app_session->DISCONNECTED() );
-			last;
-		}
-	
-	}
-
-	my $x = 0;
-	while( $app_session && $app_session->queue_wait(0) ) {
-		$logger->debug( "Looping on zombies " . $x++ , DEBUG);
-	}
-
-	$logger->debug( "Timed out, disconnected, or authentication failed" );
-	$app_session->kill_me if ($app_session);
-
-	$0 = $orig;
-}
-
-
-sub serve {
-	my( $self ) = @_;
-
-	my $app = $self->app();
-	$logger->set_service($app);
-
-	$0 = "OpenSRF master [$app]";
-
-	my $client = OpenSRF::Utils::SettingsClient->new();
-    my @base = ('apps', $app, 'unix_config' );
-
-	my $min_servers = $client->config_value(@base, 'min_children');
-	my $max_servers = $client->config_value(@base, "max_children" );
-	my $min_spare =	$client->config_value(@base, "min_spare_children" );
-	my $max_spare = $client->config_value(@base, "max_spare_children" );
-	my $max_requests = $client->config_value(@base, "max_requests" );
-    # fwiw, these file paths are (obviously) not portable
-	my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" ));
-	my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" ));
-	my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" ));
-
-    $min_spare ||= $min_servers;
-    $max_spare ||= $max_servers;
-    $max_requests ||= 1000;
-
-    $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ".
-        "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file");
-
-    $self->run(
-        min_servers => $min_servers,
-        max_servers => $max_servers,
-        min_spare_servers => $min_spare,
-        max_spare_servers => $max_spare,
-        max_requests => $max_requests,
-        log_file => $log_file,
-        port => $port,
-        proto => 'unix',
-        pid_file => $pid_file,
-    );
-
-}
-
-
-sub configure_hook {
-	my $self = shift;
-	my $app = $self->app;
-
-	# boot a client
-	OpenSRF::System->bootstrap_client( client_name => "system_client" );
-
-	$logger->debug( "Setting application implementation for $app", DEBUG );
-	my $client = OpenSRF::Utils::SettingsClient->new();
-	my $imp = $client->config_value("apps", $app, "implementation");
-	OpenSRF::Application::server_class($app);
-	OpenSRF::Application->application_implementation( $imp );
-	OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
-	OpenSRF::Application->application_implementation->initialize()
-		if (OpenSRF::Application->application_implementation->can('initialize'));
-
-	if( $client->config_value("server_type") !~ /fork/i  ) {
-		$self->child_init_hook();
-	}
-
-	my $con = OpenSRF::Transport::PeerHandle->retrieve;
-	if($con) {
-		$con->disconnect;
-	}
-
-	return OpenSRF::Application->application_implementation;
-}
-
-sub child_init_hook { 
-
-	$0 =~ s/master/drone/g;
-
-	if ($ENV{OPENSRF_PROFILE}) {
-		my $file = $0;
-		$file =~ s/\W/_/go;
-		eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
-		if ($@) {
-			$logger->debug("Could not load Devel::Profiler: $@",ERROR);
-		} else {
-			$0 .= ' [PROFILING]';
-			$logger->debug("Running under Devel::Profiler", INFO);
-		}
-	}
-
-	my $self = shift;
-
-#	$logger->transport( 
-#			"Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
-	OpenSRF::Transport::PeerHandle->construct( $self->app() );
-	$logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
-
-	OpenSRF::Application->application_implementation->child_init
-		if (OpenSRF::Application->application_implementation->can('child_init'));
-
-	return OpenSRF::Transport::PeerHandle->retrieve;
-}
-
-sub child_finish_hook {
-    $logger->debug("attempting to call child exit handler...");
-	OpenSRF::Application->application_implementation->child_exit
-		if (OpenSRF::Application->application_implementation->can('child_exit'));
-}
-
-
-1;
-



More information about the opensrf-commits mailing list