[Opensrf-commits] r2016 - in trunk: bin src/perl/lib/OpenSRF src/perl/lib/OpenSRF/Transport/SlimJabber (erickson)
svn at svn.open-ils.org
svn at svn.open-ils.org
Tue Aug 31 20:17:13 EDT 2010
Author: erickson
Date: 2010-08-31 20:17:12 -0400 (Tue, 31 Aug 2010)
New Revision: 2016
Added:
trunk/src/perl/lib/OpenSRF/Server.pm
Removed:
trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm
trunk/src/perl/lib/OpenSRF/UnixServer.pm
Modified:
trunk/bin/opensrf-perl.pl.in
trunk/src/perl/lib/OpenSRF/Application.pm
trunk/src/perl/lib/OpenSRF/System.pm
trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm
trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
Log:
Replace Net::Server with local pre-forking server
Support max/min children and max/min spare children
For more, see http://libmail.georgialibraries.org/pipermail/open-ils-dev/2010-May/006068.html
Modified: trunk/bin/opensrf-perl.pl.in
===================================================================
--- trunk/bin/opensrf-perl.pl.in 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/bin/opensrf-perl.pl.in 2010-09-01 00:17:12 UTC (rev 2016)
@@ -110,6 +110,7 @@
# start a specific service
sub do_start {
my $service = shift;
+
if(-e get_pid_file($service)) {
msg("$service is already running");
return;
@@ -117,19 +118,9 @@
load_settings() if $service eq 'opensrf.settings';
- my $sclient = OpenSRF::Utils::SettingsClient->new;
- my $apps = $sclient->config_value("activeapps", "appname");
- OpenSRF::Transport::PeerHandle->retrieve->disconnect;
-
if(grep { $_ eq $service } @hosted_services) {
return unless do_daemon($service);
- launch_net_server($service);
- launch_listener($service);
- $0 = "OpenSRF controller [$service]";
- while(my $pid = waitpid(-1, 0)) {
- last if $pid == -1;
- $logger->debug("Cleaning up Perl $service process $pid");
- }
+ OpenSRF::System->run_service($service);
}
msg("$service is not configured to run on $hostname");
@@ -169,6 +160,9 @@
close STDIN;
close STDOUT;
close STDERR;
+ open STDIN, '</dev/null';
+ open STDOUT, '>/dev/null';
+ open STDERR, '>/dev/null';
`echo $$ > $pid_file`;
return 1;
}
@@ -184,29 +178,6 @@
$parser->get_server_config($conf->env->hostname);
}
-# starts up the unix::server master process
-sub launch_net_server {
- my $service = shift;
- push @OpenSRF::UnixServer::ISA, 'Net::Server::PreFork';
- unless(OpenSRF::Utils::safe_fork()) {
- $0 = "OpenSRF Drone [$service]";
- OpenSRF::UnixServer->new($service)->serve;
- exit;
- }
- return 1;
-}
-
-# starts up the inbound listener process
-sub launch_listener {
- my $service = shift;
- unless(OpenSRF::Utils::safe_fork()) {
- $0 = "OpenSRF listener [$service]";
- OpenSRF::Transport::Listener->new($service)->initialize->listen;
- exit;
- }
- return 1;
-}
-
sub msg {
my $m = shift;
my $v = shift;
Modified: trunk/src/perl/lib/OpenSRF/Application.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Application.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Application.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -12,7 +12,6 @@
use OpenSRF::EX qw/:try/;
use Carp;
use OpenSRF::Utils::JSON;
-#use OpenSRF::UnixServer; # to get the server class from UnixServer::App
sub DESTROY{};
Added: trunk/src/perl/lib/OpenSRF/Server.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Server.pm (rev 0)
+++ trunk/src/perl/lib/OpenSRF/Server.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -0,0 +1,596 @@
+# ----------------------------------------------------------------
+# Copyright (C) 2010 Equinox Software, Inc.
+# Bill Erickson <erickson at esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+# ----------------------------------------------------------------
+package OpenSRF::Server;
+use strict;
+use warnings;
+use OpenSRF::Transport;
+use OpenSRF::Application;
+use OpenSRF::Utils::Config;
+use OpenSRF::Transport::PeerHandle;
+use OpenSRF::Utils::SettingsClient;
+use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::Transport::SlimJabber::Client;
+use POSIX qw/:sys_wait_h :errno_h/;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use IO::Select;
+use Socket;
+our $chatty = 1; # disable for production
+
+use constant STATUS_PIPE_DATA_SIZE => 12;
+
+sub new {
+ my($class, $service, %args) = @_;
+ my $self = bless(\%args, $class);
+
+ $self->{service} = $service; # service name
+ $self->{num_children} = 0; # number of child processes
+ $self->{osrf_handle} = undef; # xmpp handle
+ $self->{routers} = []; # list of registered routers
+ $self->{active_list} = []; # list of active children
+ $self->{idle_list} = []; # list of idle children
+
+ $self->{min_spare_children} ||= 0;
+
+ $self->{max_spare_children} = $self->{min_spare_children} + 1 if
+ $self->{max_spare_children} and
+ $self->{max_spare_children} <= $self->{min_spare_children};
+
+ return $self;
+}
+
+# ----------------------------------------------------------------
+# Disconnects from routers and waits for child processes to exit.
+# ----------------------------------------------------------------
+sub cleanup {
+ my $self = shift;
+ my $no_exit = shift;
+
+ $logger->info("server: shutting down and cleaning up...");
+
+ # don't get sidetracked by signals while we're cleaning up.
+ # it could result in unexpected behavior with list traversal
+ $SIG{CHLD} = 'IGNORE';
+
+ # terminate the child processes
+ $self->kill_child($_) for
+ (@{$self->{idle_list}}, @{$self->{active_list}});
+
+ # de-register routers
+ $self->unregister_routers;
+
+ $self->{osrf_handle}->disconnect;
+
+ # clean up our dead children
+ $self->reap_children(1);
+
+ exit(0) unless $no_exit;
+}
+
+
+# ----------------------------------------------------------------
+# Waits on the jabber socket for inbound data from the router.
+# Each new message is passed off to a child process for handling.
+# At regular intervals, wake up for min/max spare child maintenance
+# ----------------------------------------------------------------
+sub run {
+ my $self = shift;
+
+ $logger->set_service($self->{service});
+
+ $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
+ $SIG{CHLD} = sub { $self->reap_children(); };
+
+ $self->spawn_children;
+ $self->build_osrf_handle;
+ $self->register_routers;
+ my $wait_time = 1;
+
+ # main server loop
+ while(1) {
+
+ $self->check_status;
+ $self->{child_died} = 0;
+
+ my $msg = $self->{osrf_handle}->process($wait_time);
+
+ # we woke up for any reason, reset the wait time to allow
+ # for idle maintenance as necessary
+ $wait_time = 1;
+
+ if($msg) {
+
+ if(my $child = pop(@{$self->{idle_list}})) {
+
+ # we have an idle child to handle the request
+ $chatty and $logger->internal("server: passing request to idle child $child");
+ push(@{$self->{active_list}}, $child);
+ $self->write_child($child, $msg);
+
+ } elsif($self->{num_children} < $self->{max_children}) {
+
+ # spawning a child to handle the request
+ $chatty and $logger->internal("server: spawning child to handle request");
+ $self->write_child($self->spawn_child(1), $msg);
+
+ } else {
+
+ $logger->warn("server: no children available, waiting...");
+ $self->check_status(1); # block until child is available
+
+ my $child = pop(@{$self->{idle_list}});
+ push(@{$self->{active_list}}, $child);
+ $self->write_child($child, $msg);
+ }
+
+ } else {
+
+ # don't perform idle maint immediately when woken by SIGCHLD
+ unless($self->{child_died}) {
+
+ # when we hit equilibrium, there's no need for regular
+ # maintenance, so set wait_time to 'forever'
+ $wait_time = -1 unless $self->perform_idle_maintenance;
+ }
+ }
+ }
+}
+
+# ----------------------------------------------------------------
+# Launch a new spare child or kill an extra spare child. To
+# prevent large-scale spawning or die-offs, spawn or kill only
+# 1 process per idle maintenance loop.
+# Returns true if any idle maintenance occurred, 0 otherwise
+# ----------------------------------------------------------------
+sub perform_idle_maintenance {
+ my $self = shift;
+
+ # spawn 1 spare child per maintenance loop if necessary
+ if( $self->{min_spare_children} and
+ $self->{num_children} < $self->{max_children} and
+ scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
+
+ $chatty and $logger->internal("server: spawning spare child");
+ $self->spawn_child;
+ return 1;
+
+ # kill 1 excess spare child per maintenance loop if necessary
+ } elsif($self->{max_spare_children} and
+ $self->{num_children} > $self->{min_children} and
+ scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
+
+ $chatty and $logger->internal("server: killing spare child");
+ $self->kill_child;
+ return 1;
+ }
+
+ return 0;
+}
+
+sub kill_child {
+ my $self = shift;
+ my $child = shift || pop(@{$self->{idle_list}}) or return;
+ $chatty and $logger->internal("server: killing child $child");
+ kill('TERM', $child->{pid});
+}
+
+# ----------------------------------------------------------------
+# Jabber connection inbound message arrive on.
+# ----------------------------------------------------------------
+sub build_osrf_handle {
+ my $self = shift;
+
+ my $conf = OpenSRF::Utils::Config->current;
+ my $username = $conf->bootstrap->username;
+ my $password = $conf->bootstrap->passwd;
+ my $domain = $conf->bootstrap->domain;
+ my $port = $conf->bootstrap->port;
+ my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
+
+ $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
+
+ $self->{osrf_handle} =
+ OpenSRF::Transport::SlimJabber::Client->new(
+ username => $username,
+ resource => $resource,
+ password => $password,
+ host => $domain,
+ port => $port,
+ );
+
+ $self->{osrf_handle}->initialize;
+}
+
+
+# ----------------------------------------------------------------
+# Sends request data to a child process
+# ----------------------------------------------------------------
+sub write_child {
+ my($self, $child, $msg) = @_;
+ my $xml = $msg->to_xml;
+ syswrite($child->{pipe_to_child}, $xml);
+}
+
+# ----------------------------------------------------------------
+# Checks to see if any child process has reported its availability
+# In blocking mode, blocks until a child has reported.
+# ----------------------------------------------------------------
+sub check_status {
+ my($self, $block) = @_;
+
+ my $read_set = IO::Select->new;
+ $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
+
+ my @handles = $read_set->can_read(($block) ? undef : 0) or return;
+
+ my $pid = '';
+ my @pids;
+ for my $pipe (@handles) {
+ sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
+ push(@pids, int($pid));
+ }
+
+ $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
+
+ my $child;
+ my @new_actives;
+
+ # move the children from the active list to the idle list
+ for my $proc (@{$self->{active_list}}) {
+ if(grep { $_ == $proc->{pid} } @pids) {
+ push(@{$self->{idle_list}}, $proc);
+ } else {
+ push(@new_actives, $proc);
+ }
+ }
+
+ $self->{active_list} = [@new_actives];
+
+ $chatty and $logger->internal(sprintf(
+ "server: %d idle and %d active children after status update",
+ scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+}
+
+# ----------------------------------------------------------------
+# Cleans up any child processes that have exited.
+# In shutdown mode, block until all children have washed ashore
+# ----------------------------------------------------------------
+sub reap_children {
+ my($self, $shutdown) = @_;
+ $self->{child_died} = 1;
+
+ while(1) {
+
+ my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
+ return if $pid <= 0;
+
+ $chatty and $logger->internal("server: reaping child $pid");
+
+ my ($child) = grep {$_->{pid} == $pid} (@{$self->{active_list}}, @{$self->{idle_list}});
+
+ close($child->{pipe_to_parent});
+ close($child->{pipe_to_child});
+ delete $child->{$_} for keys %$child; # destroy with a vengeance
+
+ $self->{num_children}--;
+ $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
+ $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
+ }
+
+ $self->spawn_children unless $shutdown;
+
+ $chatty and $logger->internal(sprintf(
+ "server: %d idle and %d active children after reap_children",
+ scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+
+}
+
+# ----------------------------------------------------------------
+# Spawn up to max_children processes
+# ----------------------------------------------------------------
+sub spawn_children {
+ my $self = shift;
+ $self->spawn_child while $self->{num_children} < $self->{min_children};
+}
+
+# ----------------------------------------------------------------
+# Spawns a new child. If $active is set, the child goes directly
+# into the active_list.
+# ----------------------------------------------------------------
+sub spawn_child {
+ my($self, $active) = @_;
+
+ my $child = OpenSRF::Server::Child->new($self);
+
+ # socket for sending message data to the child
+ if(!socketpair(
+ $child->{pipe_to_child},
+ $child->{pipe_to_parent},
+ AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
+ $logger->error("server: error creating data socketpair: $!");
+ return undef;
+ }
+
+ $child->{pipe_to_child}->autoflush(1);
+ $child->{pipe_to_parent}->autoflush(1);
+
+ $child->{pid} = fork();
+
+ if($child->{pid}) { # parent process
+ $self->{num_children}++;
+
+
+ if($active) {
+ push(@{$self->{active_list}}, $child);
+ } else {
+ push(@{$self->{idle_list}}, $child);
+ }
+
+ $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
+
+ return $child;
+
+ } else { # child process
+
+ $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
+
+ $child->{pid} = $$;
+ eval {
+ $child->init;
+ $child->run;
+ OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+ };
+ $logger->error("server: child process died: $@") if $@;
+ exit(0);
+ }
+}
+
+# ----------------------------------------------------------------
+# Sends the register command to the configured routers
+# ----------------------------------------------------------------
+sub register_routers {
+ my $self = shift;
+
+ my $conf = OpenSRF::Utils::Config->current;
+ my $routers = $conf->bootstrap->routers;
+ my $router_name = $conf->bootstrap->router_name;
+ my @targets;
+
+ for my $router (@$routers) {
+ if(ref $router) {
+
+ if( !$router->{services} ||
+ !$router->{services}->{service} ||
+ (
+ ref($router->{services}->{service}) eq 'ARRAY' and
+ grep { $_ eq $self->{service} } @{$router->{services}->{service}}
+ ) || $router->{services}->{service} eq $self->{service}) {
+
+ my $name = $router->{name};
+ my $domain = $router->{domain};
+ push(@targets, "$name\@$domain/router");
+ }
+
+ } else {
+ push(@targets, "$router_name\@$router/router");
+ }
+ }
+
+ foreach (@targets) {
+ $logger->info("server: registering with router $_");
+ $self->{osrf_handle}->send(
+ to => $_,
+ body => 'registering',
+ router_command => 'register',
+ router_class => $self->{service}
+ );
+ }
+
+ $self->{routers} = \@targets;
+}
+
+# ----------------------------------------------------------------
+# Sends the unregister command to any routers we have registered
+# with.
+# ----------------------------------------------------------------
+sub unregister_routers {
+ my $self = shift;
+ return unless $self->{osrf_handle}->tcp_connected;
+
+ for my $router (@{$self->{routers}}) {
+ $logger->info("server: disconnecting from router $router");
+ $self->{osrf_handle}->send(
+ to => $router,
+ body => "unregistering",
+ router_command => "unregister",
+ router_class => $self->{service}
+ );
+ }
+}
+
+
+package OpenSRF::Server::Child;
+use strict;
+use warnings;
+use OpenSRF::Transport;
+use OpenSRF::Application;
+use OpenSRF::Transport::PeerHandle;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw(time);
+use POSIX qw/:sys_wait_h :errno_h/;
+
+use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
+
+sub new {
+ my($class, $parent) = @_;
+ my $self = bless({}, $class);
+ $self->{pid} = 0; # my process ID
+ $self->{parent} = $parent; # Controller parent process
+ $self->{num_requests} = 0; # total serviced requests
+ return $self;
+}
+
+sub set_nonblock {
+ my($self, $fh) = @_;
+ my $flags = fcntl($fh, F_GETFL, 0);
+ fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+sub set_block {
+ my($self, $fh) = @_;
+ my $flags = fcntl($fh, F_GETFL, 0);
+ $flags &= ~O_NONBLOCK;
+ fcntl($fh, F_SETFL, $flags);
+}
+
+# ----------------------------------------------------------------
+# Connects to Jabber and runs the application child_init
+# ----------------------------------------------------------------
+sub init {
+ my $self = shift;
+ my $service = $self->{parent}->{service};
+ $0 = "OpenSRF Drone [$service]";
+ OpenSRF::Transport::PeerHandle->construct($service);
+ OpenSRF::Application->application_implementation->child_init
+ if (OpenSRF::Application->application_implementation->can('child_init'));
+}
+
+# ----------------------------------------------------------------
+# Waits for messages from the parent process, handles the message,
+# then goes into the keepalive loop if this is a stateful session.
+# When max_requests is hit, the process exits.
+# ----------------------------------------------------------------
+sub run {
+ my $self = shift;
+ my $network = OpenSRF::Transport::PeerHandle->retrieve;
+
+ # main child run loop. Ends when this child hits max requests.
+ while(1) {
+
+ my $data = $self->wait_for_request or next;
+
+ # Update process name to show activity
+ my $orig_name = $0;
+ $0 = "$0*";
+
+ # Discard extraneous data from the jabber socket
+ if(!$network->flush_socket()) {
+ $logger->error("server: network disconnected! child dropping request and exiting: $data");
+ exit;
+ }
+
+ my $session = OpenSRF::Transport->handler(
+ $self->{parent}->{service},
+ OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+ );
+
+ $self->keepalive_loop($session);
+
+ last if ++$self->{num_requests} == $self->{parent}->{max_requests};
+
+ # Tell the parent process we are available to process requests
+ $self->send_status;
+
+ # Repair process name
+ $0 = $orig_name;
+ }
+
+ $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
+
+ OpenSRF::Application->application_implementation->child_exit
+ if (OpenSRF::Application->application_implementation->can('child_exit'));
+}
+
+# ----------------------------------------------------------------
+# waits for a request data on the parent pipe and returns it.
+# ----------------------------------------------------------------
+sub wait_for_request {
+ my $self = shift;
+
+ my $data = '';
+ my $read_size = 1024;
+ my $nonblock = 0;
+
+ while(1) {
+ # Start out blocking, when data is available, read it all
+
+ my $buf = '';
+ my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+
+ unless(defined $n) {
+ $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+ last;
+ }
+
+ last if $n <= 0; # no data left to read
+
+ $data .= $buf;
+
+ last if $n < $read_size; # done reading all data
+
+ $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
+ $nonblock = 1;
+ }
+
+ $self->set_block($self->{pipe_to_parent}) if $nonblock;
+ return $data;
+}
+
+
+# ----------------------------------------------------------------
+# If this is a stateful opensrf session, wait up to $keepalive
+# seconds for subsequent requests from the client
+# ----------------------------------------------------------------
+sub keepalive_loop {
+ my($self, $session) = @_;
+ my $keepalive = $self->{parent}->{keepalive};
+
+ while($session->state and $session->state == $session->CONNECTED) {
+
+ unless( $session->queue_wait($keepalive) ) {
+
+ # client failed to disconnect before timeout
+ $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
+
+ my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
+ status => "Disconnected on timeout",
+ statusCode => STATUS_TIMEOUT
+ );
+
+ $session->status($res);
+ $session->state($session->DISCONNECTED);
+ last;
+ }
+ }
+
+ $chatty and $logger->internal("server: child done with request(s)");
+ $session->kill_me;
+}
+
+# ----------------------------------------------------------------
+# Report our availability to our parent process
+# ----------------------------------------------------------------
+sub send_status {
+ my $self = shift;
+ syswrite(
+ $self->{pipe_to_parent},
+ sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
+ );
+}
+
+
+1;
Modified: trunk/src/perl/lib/OpenSRF/System.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/System.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/System.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -2,10 +2,9 @@
use strict; use warnings;
use OpenSRF;
use base 'OpenSRF';
-use OpenSRF::Utils::Logger qw(:level);
+use OpenSRF::Utils::Logger qw($logger);
use OpenSRF::Transport::Listener;
use OpenSRF::Transport;
-use OpenSRF::UnixServer;
use OpenSRF::Utils;
use OpenSRF::EX qw/:try/;
use POSIX qw/setsid :sys_wait_h/;
@@ -13,7 +12,7 @@
use OpenSRF::Utils::SettingsParser;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Application;
-use Net::Server::PreFork;
+use OpenSRF::Server;
my $bootstrap_config_file;
sub import {
@@ -67,4 +66,47 @@
return 0;
}
+sub run_service {
+ my($class, $service) = @_;
+
+ $0 = "OpenSRF Listener [$service]";
+
+ # temp connection to use for application initialization
+ OpenSRF::System->bootstrap_client(client_name => "system_client");
+
+ my $sclient = OpenSRF::Utils::SettingsClient->new;
+ my $getval = sub { $sclient->config_value(apps => $service => @_); };
+
+ my $impl = $getval->('implementation');
+
+ OpenSRF::Application::server_class($service);
+ OpenSRF::Application->application_implementation($impl);
+ OpenSRF::Utils::JSON->register_class_hint(name => $impl, hint => $service, type => 'hash');
+ OpenSRF::Application->application_implementation->initialize()
+ if (OpenSRF::Application->application_implementation->can('initialize'));
+
+ # kill the temp connection
+ OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+
+ my $server = OpenSRF::Server->new(
+ $service,
+ keepalive => $getval->('keepalive') || 5,
+ max_requests => $getval->(unix_config => 'max_requests') || 10000,
+ max_children => $getval->(unix_config => 'max_children') || 20,
+ min_children => $getval->(unix_config => 'min_children') || 1,
+ min_spare_children => $getval->(unix_config => 'min_spare_children'),
+ max_spare_children => $getval->(unix_config => 'max_spare_children')
+ );
+
+ while(1) {
+ eval { $server->run; };
+ # we only arrive here if the server died a painful death
+ $logger->error("server: died with error $@");
+ $server->cleanup(1);
+ $logger->info("server: restarting after fatal crash...");
+ sleep 2;
+ }
+}
+
+
1;
Modified: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -8,13 +8,8 @@
use OpenSRF::Utils::Logger qw/$logger/;
use OpenSRF::Transport::SlimJabber::XMPPReader;
use OpenSRF::Transport::SlimJabber::XMPPMessage;
-use IO::Socket::UNIX;
-use FreezeThaw qw/freeze/;
+use IO::Socket::INET;
-sub DESTROY{
- shift()->disconnect;
-}
-
=head1 NAME
OpenSRF::Transport::SlimJabber::Client
@@ -153,6 +148,7 @@
unless ( $self->reader->connected );
$self->xmpp_id("$username\@$host/$resource");
+ $logger->debug("Created XMPP connection " . $self->xmpp_id);
return $self;
}
Deleted: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -1,201 +0,0 @@
-package OpenSRF::Transport::SlimJabber::Inbound;
-use strict;use warnings;
-use base qw/OpenSRF::Transport::SlimJabber::Client/;
-use OpenSRF::EX qw(:try);
-use OpenSRF::Utils::Logger qw(:level);
-use OpenSRF::Utils::SettingsClient;
-use OpenSRF::Utils::Config;
-use Time::HiRes qw/usleep/;
-use FreezeThaw qw/freeze/;
-
-my $logger = "OpenSRF::Utils::Logger";
-
-=head1 Description
-
-This is the jabber connection where all incoming client requests will be accepted.
-This connection takes the data, passes it off to the system then returns to take
-more data. Connection params are all taken from the config file and the values
-retreived are based on the $app name passed into new().
-
-This service should be loaded at system startup.
-
-=cut
-
-{
- my $unix_sock;
- sub unix_sock { return $unix_sock; }
- my $instance;
-
- sub new {
- my( $class, $app ) = @_;
- $class = ref( $class ) || $class;
- if( ! $instance ) {
-
- my $conf = OpenSRF::Utils::Config->current;
- my $domain = $conf->bootstrap->domain;
- $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
- my $username = $conf->bootstrap->username;
- my $password = $conf->bootstrap->passwd;
- my $port = $conf->bootstrap->port;
- my $host = $domain;
- my $resource = $app . '_listener_at_' . $conf->env->hostname;
-
- my $no_router = 0; # make this a config entry if we want to use it
- if($no_router) {
- # no router, only one listener running..
- $username = "router";
- $resource = $app;
- }
-
- OpenSRF::Utils::Logger->transport("Inbound as $username, $password, $resource, $host, $port\n", INTERNAL );
-
- my $self = __PACKAGE__->SUPER::new(
- username => $username,
- resource => $resource,
- password => $password,
- host => $host,
- port => $port,
- );
-
- $self->{app} = $app;
-
- my $client = OpenSRF::Utils::SettingsClient->new();
- my $f = $client->config_value("dirs", "sock");
- $unix_sock = join( "/", $f,
- $client->config_value("apps", $app, "unix_config", "unix_sock" ));
- bless( $self, $class );
- $instance = $self;
- }
- return $instance;
- }
-
-}
-
-sub DESTROY {
- my $self = shift;
- for my $router (@{$self->{routers}}) {
- if($self->tcp_connected()) {
- $logger->info("disconnecting from router $router");
- $self->send( to => $router, body => "registering",
- router_command => "unregister" , router_class => $self->{app} );
- }
- }
-}
-
-my $sig_pipe = 0;
-
-sub listen {
- my $self = shift;
-
- $self->{routers} = [];
-
- try {
-
- my $conf = OpenSRF::Utils::Config->current;
- my $router_name = $conf->bootstrap->router_name;
- my $routers = $conf->bootstrap->routers;
- $logger->info("loading router info $routers");
-
- for my $router (@$routers) {
- if(ref $router) {
- if( !$router->{services} ||
- !$router->{services}->{service} ||
- (
- ref($router->{services}->{service}) eq 'ARRAY' and
- grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) ||
- $router->{services}->{service} eq $self->{app}) {
-
- my $name = $router->{name};
- my $domain = $router->{domain};
- my $target = "$name\@$domain/router";
- push(@{$self->{routers}}, $target);
- $logger->info( $self->{app} . " connecting to router $target");
- $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} );
- }
- } else {
- my $target = "$router_name\@$router/router";
- push(@{$self->{routers}}, $target);
- $logger->info( $self->{app} . " connecting to router $target");
- $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} );
- }
- }
-
- } catch Error with {
- my $err = shift;
- $logger->error($self->{app} . ": No routers defined: $err");
- # no routers defined
- };
-
- my $app = $self->{app};
-
- $logger->info("$app inbound: going into listen loop" );
-
- while(1) {
-
- my $sock = $self->unix_sock();
- my $o;
-
- try {
- $o = $self->process(-1);
-
- if(!$o){
- $logger->error("$app inbound: received no data from the Jabber socket in process()");
- usleep(100000); # otherwise we loop and pound syslog logger with errors
- }
-
- } catch OpenSRF::EX::JabberDisconnected with {
-
- $logger->error("$app inbound: process lost its jabber connection. Attempting to reconnect...");
- $self->initialize;
- $o = undef;
- };
-
- next unless $o;
-
- while(1) {
- # keep trying to deliver the message until we succeed
-
- my $socket = IO::Socket::UNIX->new( Peer => $sock );
-
- unless($socket and $socket->connected) {
- $logger->error("$app inbound: unable to connect to inbound socket $sock: $!");
- usleep(50000); # 50 msec
- next;
- }
-
- # block until the pipe is ready for writing
- my $outfile = '';
- vec($outfile, $socket->fileno, 1) = 1;
- my $nfound = select(undef, $outfile, undef, undef);
-
- next unless $nfound; # should not happen since we're blocking
-
- if($nfound == -1) { # select failed
- $logger->error("$app inbound: unable to write to socket: $!");
- usleep(50000); # 50 msec
- next;
- }
-
- $sig_pipe = 0;
- local $SIG{'PIPE'} = sub { $sig_pipe = 1; };
- print $socket freeze($o);
-
- if($sig_pipe) {
- # The attempt to write to the socket failed. Wait a short time then try again.
- # Don't bother closing the socket, it will only cause grief
- $logger->error("$app inbound: got SIGPIPE, will retry after a short wait...");
- usleep(50000); # 50 msec
- next;
- }
-
- $socket->close;
- last;
- }
- }
-
- $logger->error("$app inbound: exited process loop");
-}
-
-1;
-
Modified: trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -60,8 +60,6 @@
OpenSRF::EX::Config->throw( "JPeer could not load all necessary values from config" )
unless ( $username and $password and $resource and $host and $port );
- OpenSRF::Utils::Logger->transport( "Built Peer with", INTERNAL );
-
my $self = __PACKAGE__->SUPER::new(
username => $username,
resource => $resource,
Deleted: trunk/src/perl/lib/OpenSRF/UnixServer.pm
===================================================================
--- trunk/src/perl/lib/OpenSRF/UnixServer.pm 2010-08-31 20:49:44 UTC (rev 2015)
+++ trunk/src/perl/lib/OpenSRF/UnixServer.pm 2010-09-01 00:17:12 UTC (rev 2016)
@@ -1,266 +0,0 @@
-package OpenSRF::UnixServer;
-use strict; use warnings;
-use base qw/OpenSRF/;
-use OpenSRF::EX qw(:try);
-use OpenSRF::Utils::Logger qw(:level $logger);
-use OpenSRF::Transport::PeerHandle;
-use OpenSRF::Application;
-use OpenSRF::AppSession;
-use OpenSRF::DomainObject::oilsResponse qw/:status/;
-use OpenSRF::System;
-use OpenSRF::Utils::SettingsClient;
-use Time::HiRes qw(time);
-use OpenSRF::Utils::JSON;
-use vars qw/@ISA $app/;
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Carp;
-use FreezeThaw qw/thaw/;
-
-use IO::Socket::INET;
-use IO::Socket::UNIX;
-
-sub DESTROY { confess "Dying $$"; }
-
-=head1 What am I
-
-All inbound messages are passed on to the UnixServer for processing.
-We take the data, close the Unix socket, and pass the data on to our abstract
-'process()' method.
-
-Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
-So when you pass data down the Unix socket to us, we have been preforked and waiting
-to disperse new data among us.
-
-=cut
-
-sub app { return $app; }
-
-{
-
- sub new {
- my( $class, $app1 ) = @_;
- if( ! $app1 ) {
- throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
- }
- $app = $app1;
- my $self = bless( {}, $class );
-# my $client = OpenSRF::Utils::SettingsClient->new();
-# if( $client->config_value("server_type") !~ /fork/i ||
-# OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
-# warn "Calling hooks for non-prefork\n";
-# $self->configure_hook();
-# $self->child_init_hook();
-# }
- return $self;
- }
-
-}
-
-=head2 process_request()
-
-Takes the incoming data, closes the Unix socket and hands the data untouched
-to the abstract process() method. This method is implemented in our subclasses.
-
-=cut
-
-sub process_request {
-
- my $self = shift;
- my $data; my $d;
- while( $d = <STDIN> ) { $data .= $d; }
-
- my $orig = $0;
- $0 = "$0*";
-
- if( ! $data or ! defined( $data ) or $data eq "" ) {
- close($self->{server}->{client});
- $logger->debug("Unix child received empty data from socket", ERROR);
- $0 = $orig;
- return;
- }
-
-
- if( ! close( $self->{server}->{client} ) ) {
- $logger->debug( "Error closing Unix socket: $!", ERROR );
- }
-
- my $app = $self->app();
- $logger->transport( "UnixServer for $app received $data", INTERNAL );
-
- # --------------------------------------------------------------
- # Drop all data from the socket before coninuting to process
- # --------------------------------------------------------------
- my $ph = OpenSRF::Transport::PeerHandle->retrieve;
- if(!$ph->flush_socket()) {
- $logger->error("We received a request ".
- "and we are no longer connected to the jabber network. ".
- "We will go away and drop this request: $data");
- exit;
- }
-
- ($data) = thaw($data);
- my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
-
- if(!ref($app_session)) {
- $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
- $0 = $orig;
- return;
- }
-
- if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
- $logger->debug("Exiting keepalive for stateless session / orig = $orig");
- $app_session->kill_me;
- $0 = $orig;
- return;
- }
-
-
- my $client = OpenSRF::Utils::SettingsClient->new();
- my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
-
- my $req_counter = 0;
- while( $app_session and
- $app_session->state and
- $app_session->state != $app_session->DISCONNECTED() and
- $app_session->find( $app_session->session_id ) ) {
-
-
- my $before = time;
- $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
- $app_session->queue_wait( $keepalive );
- $logger->debug( "after queue wait $keepalive", INTERNAL );
- my $after = time;
-
- if( ($after - $before) >= $keepalive ) {
-
- my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
- status => "Disconnected on timeout",
- statusCode => STATUS_TIMEOUT);
- $app_session->status($res);
- $app_session->state( $app_session->DISCONNECTED() );
- last;
- }
-
- }
-
- my $x = 0;
- while( $app_session && $app_session->queue_wait(0) ) {
- $logger->debug( "Looping on zombies " . $x++ , DEBUG);
- }
-
- $logger->debug( "Timed out, disconnected, or authentication failed" );
- $app_session->kill_me if ($app_session);
-
- $0 = $orig;
-}
-
-
-sub serve {
- my( $self ) = @_;
-
- my $app = $self->app();
- $logger->set_service($app);
-
- $0 = "OpenSRF master [$app]";
-
- my $client = OpenSRF::Utils::SettingsClient->new();
- my @base = ('apps', $app, 'unix_config' );
-
- my $min_servers = $client->config_value(@base, 'min_children');
- my $max_servers = $client->config_value(@base, "max_children" );
- my $min_spare = $client->config_value(@base, "min_spare_children" );
- my $max_spare = $client->config_value(@base, "max_spare_children" );
- my $max_requests = $client->config_value(@base, "max_requests" );
- # fwiw, these file paths are (obviously) not portable
- my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" ));
- my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" ));
- my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" ));
-
- $min_spare ||= $min_servers;
- $max_spare ||= $max_servers;
- $max_requests ||= 1000;
-
- $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ".
- "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file");
-
- $self->run(
- min_servers => $min_servers,
- max_servers => $max_servers,
- min_spare_servers => $min_spare,
- max_spare_servers => $max_spare,
- max_requests => $max_requests,
- log_file => $log_file,
- port => $port,
- proto => 'unix',
- pid_file => $pid_file,
- );
-
-}
-
-
-sub configure_hook {
- my $self = shift;
- my $app = $self->app;
-
- # boot a client
- OpenSRF::System->bootstrap_client( client_name => "system_client" );
-
- $logger->debug( "Setting application implementation for $app", DEBUG );
- my $client = OpenSRF::Utils::SettingsClient->new();
- my $imp = $client->config_value("apps", $app, "implementation");
- OpenSRF::Application::server_class($app);
- OpenSRF::Application->application_implementation( $imp );
- OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
- OpenSRF::Application->application_implementation->initialize()
- if (OpenSRF::Application->application_implementation->can('initialize'));
-
- if( $client->config_value("server_type") !~ /fork/i ) {
- $self->child_init_hook();
- }
-
- my $con = OpenSRF::Transport::PeerHandle->retrieve;
- if($con) {
- $con->disconnect;
- }
-
- return OpenSRF::Application->application_implementation;
-}
-
-sub child_init_hook {
-
- $0 =~ s/master/drone/g;
-
- if ($ENV{OPENSRF_PROFILE}) {
- my $file = $0;
- $file =~ s/\W/_/go;
- eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
- if ($@) {
- $logger->debug("Could not load Devel::Profiler: $@",ERROR);
- } else {
- $0 .= ' [PROFILING]';
- $logger->debug("Running under Devel::Profiler", INFO);
- }
- }
-
- my $self = shift;
-
-# $logger->transport(
-# "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
- OpenSRF::Transport::PeerHandle->construct( $self->app() );
- $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
-
- OpenSRF::Application->application_implementation->child_init
- if (OpenSRF::Application->application_implementation->can('child_init'));
-
- return OpenSRF::Transport::PeerHandle->retrieve;
-}
-
-sub child_finish_hook {
- $logger->debug("attempting to call child exit handler...");
- OpenSRF::Application->application_implementation->child_exit
- if (OpenSRF::Application->application_implementation->can('child_exit'));
-}
-
-
-1;
-
More information about the opensrf-commits
mailing list