[OpenSRF-GIT] OpenSRF branch master updated. aeeb4acdc8695a640021dbc6902ab3279652583d

Evergreen Git git at git.evergreen-ils.org
Mon Feb 20 14:33:30 EST 2012


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "OpenSRF".

The branch, master has been updated
       via  aeeb4acdc8695a640021dbc6902ab3279652583d (commit)
       via  08ee4f993fe773e37233b139961cbcdae2fe93b8 (commit)
      from  04558f38c1c1d314acb978a37193dacb4a6eba31 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit aeeb4acdc8695a640021dbc6902ab3279652583d
Author: Bill Erickson <berick at esilibrary.com>
Date:   Tue Feb 14 09:10:58 2012 -0500

    Perl parent/child write improvements
    
    * Updated variable names for clarity
    * Added more inline comments
    * Added additional error logging
    * For severe read errors, allow the child to gracefully skip the request
    
    Signed-off-by: Bill Erickson <berick at esilibrary.com>
    Signed-off-by: Jason Stephenson <jstephenson at mvlc.org>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm
index 32954f2..6cab1dd 100644
--- a/src/perl/lib/OpenSRF/Server.pm
+++ b/src/perl/lib/OpenSRF/Server.pm
@@ -587,13 +587,13 @@ sub run {
 sub wait_for_request {
     my $self = shift;
 
-    my $data = '';
-    my $buf_size = 4096;
-    my $nonblock = 0;
+    my $data = ''; # final request data
+    my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
     my $read_pipe = $self->{pipe_to_parent};
-    my $data_size;
-    my $total_read;
-    my $first_read = 1;
+    my $bytes_needed; # size of the data we are about to receive
+    my $bytes_recvd; # number of bytes read so far
+    my $first_read = 1; # true for first loop iteration
+    my $read_error;
 
     while (1) {
 
@@ -611,39 +611,45 @@ sub wait_for_request {
         $self->set_nonblock($read_pipe);
 
         while (1) {
-            # pull as much data from the pipe as possible
+            # read all of the available data
 
             my $buf = '';
-            my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size);
-
-            unless(defined $bytes_read) {
-                $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+            my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
+
+            unless(defined $nbytes) {
+                if ($! != EAGAIN) {
+                    $logger->error("server: error reading data from parent: $!.  ".
+                        "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
+                    $read_error = 1;
+                }
                 last;
             }
 
-            last if $bytes_read <= 0; # no more data available for reading
+            last if $nbytes <= 0; # no more data available for reading
 
-            $total_read += $bytes_read;
+            $bytes_recvd += $nbytes;
             $data .= $buf;
         }
 
-        # we've read all the data currently available on the pipe.
-        # let's see if we're done.
+        $self->set_block($self->{pipe_to_parent});
+        return undef if $read_error;
 
+        # extract the data size and remove the header from the final data
         if ($first_read) {
-            # extract the data size and remove the size header
             my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
-            $data_size = int(substr($data, 0, $wps_size)) + $wps_size;
+            $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
             $data = substr($data, $wps_size);
             $first_read = 0;
         }
 
-        $self->set_block($self->{pipe_to_parent});
 
-        if ($total_read == $data_size) {
+        if ($bytes_recvd == $bytes_needed) {
             # we've read all the data. Nothing left to do
             last;
         }
+
+        $logger->info("server: child process read all available pipe data.  ".
+            "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
     }
 
     return $data;

commit 08ee4f993fe773e37233b139961cbcdae2fe93b8
Author: Bill Erickson <berick at esilibrary.com>
Date:   Mon Feb 13 16:53:59 2012 -0500

    Perl pipe reading overhaul : data size header
    
    The lockfile mechanism for preventing premature end of reads on child
    processes suffers from one serious flaw:  if the data to write exceeds
    the pipe buffer size, the parent will block on syswrite and the service
    will lock up.  It's also not as effecient (for the normal case) as the
    code was without the lockfile, becasue the writes and reads are
    serialized.
    
    This commit replaces the lockfile mechanism with a protocol header in
    the data.  The first X (currently 12) bytes of data written to the child
    process will contain the full length of the data to be written (minus
    the header size).  The child now reads the data in parallel with the parent as
    data is available.  If the child reads all available data (in the pipe)
    but not all of the expected data, the child will go back into a select()
    wait pending more data from the parent.  The process continues until all
    data is read.
    
    This same mechanism is already used to commicate status info from child
    processes to the parent.
    
    Signed-off-by: Bill Erickson <berick at esilibrary.com>
    Signed-off-by: Jason Stephenson <jstephenson at mvlc.org>
    Signed-off-by: Mike Rylander <mrylander at gmail.com>

diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm
index 90a186d..32954f2 100644
--- a/src/perl/lib/OpenSRF/Server.pm
+++ b/src/perl/lib/OpenSRF/Server.pm
@@ -24,13 +24,14 @@ use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::Transport::SlimJabber::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Time::HiRes qw/usleep/;
 use IO::Select;
 use Socket;
 our $chatty = 1; # disable for production
 
 use constant STATUS_PIPE_DATA_SIZE => 12;
+use constant WRITE_PIPE_DATA_SIZE  => 12;
 
 sub new {
     my($class, $service, %args) = @_;
@@ -48,9 +49,6 @@ sub new {
     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
         if $self->{stderr_log_path};
 
-    $self->{lock_file} = 
-        sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service});
-
     $self->{min_spare_children} ||= 0;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
@@ -85,19 +83,9 @@ sub cleanup {
     # clean up our dead children
     $self->reap_children(1);
 
-    # clean up the lock file
-    close($self->{lock_file_handle});
-    unlink($self->{lock_file});
-
     exit(0) unless $no_exit;
 }
 
-sub open_lock_file {
-    my $self = shift;
-    open($self->{lock_file_handle}, '>>', $self->{lock_file})
-        or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n";
-}
-
 # ----------------------------------------------------------------
 # Waits on the jabber socket for inbound data from the router.
 # Each new message is passed off to a child process for handling.
@@ -114,7 +102,6 @@ sub run {
     $self->spawn_children;
     $self->build_osrf_handle;
     $self->register_routers;
-    $self->open_lock_file;
     my $wait_time = 1;
 
     # main server loop
@@ -253,8 +240,10 @@ sub write_child {
     my($self, $child, $msg) = @_;
     my $xml = encode_utf8(decode_utf8($msg->to_xml));
 
-    flock($self->{lock_file_handle}, LOCK_EX) or 
-        $logger->error("server: cannot flock : $!");
+    # tell the child how much data to expect, minus the header
+    my $write_size;
+    {use bytes; $write_size = length($xml)}
+    $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
 
     for (0..2) {
 
@@ -262,16 +251,13 @@ sub write_child {
         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
 
         # send message to child data pipe
-        syswrite($child->{pipe_to_child}, $xml);
+        syswrite($child->{pipe_to_child}, $write_size . $xml);
 
         last unless $self->{sig_pipe};
         $logger->error("server: got SIGPIPE writing to $child, retrying...");
         usleep(50000); # 50 msec
     }
 
-    flock($self->{lock_file_handle}, LOCK_UN) or 
-        $logger->error("server: cannot de-flock : $!");
-
     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
 }
 
@@ -508,7 +494,7 @@ use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Time::HiRes qw(time usleep);
 use POSIX qw/:sys_wait_h :errno_h/;
 
@@ -537,26 +523,11 @@ sub set_block {
     fcntl($fh, F_SETFL, $flags);
 }
 
-sub open_lock_file {
-    my $self = shift;
-
-    # close our copy of the parent's lock file since we won't be using it
-    close($self->{parent}->{lock_file_handle}) 
-        if $self->{parent}->{lock_file_handle};
-
-    my $fname = $self->{parent}->{lock_file};
-    unless (open($self->{lock_file_handle}, '>>', $fname)) {
-        $logger->error("server: child cannot open lock file $fname : $!");
-        die "server: child cannot open lock file $fname : $!\n";
-    }
-}
-
 # ----------------------------------------------------------------
 # Connects to Jabber and runs the application child_init
 # ----------------------------------------------------------------
 sub init {
     my $self = shift;
-    $self->open_lock_file;
     my $service = $self->{parent}->{service};
     $0 = "OpenSRF Drone [$service]";
     OpenSRF::Transport::PeerHandle->construct($service);
@@ -617,59 +588,64 @@ sub wait_for_request {
     my $self = shift;
 
     my $data = '';
-    my $read_size = 1024;
+    my $buf_size = 4096;
     my $nonblock = 0;
     my $read_pipe = $self->{pipe_to_parent};
-
-    # wait for some data to start arriving
-    my $read_set = IO::Select->new;
-    $read_set->add($read_pipe);
+    my $data_size;
+    my $total_read;
+    my $first_read = 1;
 
     while (1) {
-        # if can_read is interrupted while blocking, 
-        # go back and wait again until it it succeeds.
-        last if $read_set->can_read;
-    }
 
-    # Parent has started sending data to us.
-    # Wait for the parent to write all data to the pipe.  Then, immediately release 
-    # the lock so the parent can start writing data to other child processes.  
-    # Note: there is no danger of a subsequent message coming from the parent on 
-    # the same pipe, since this child is now marked as active.
-    flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!");
-    flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!");
-
-    # we have all data now so all reads can be done in non-blocking mode
-    $self->set_nonblock($read_pipe);
+        # wait for some data to start arriving
+        my $read_set = IO::Select->new;
+        $read_set->add($read_pipe);
+    
+        while (1) {
+            # if can_read is interrupted while blocking, 
+            # go back and wait again until it succeeds.
+            last if $read_set->can_read;
+        }
 
-    while(1) {
-        my $sig_pipe = 0;
-        local $SIG{'PIPE'} = sub { $sig_pipe = 1 };
+        # parent started writing, let's start reading
+        $self->set_nonblock($read_pipe);
 
-        my $buf = '';
-        my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+        while (1) {
+            # pull as much data from the pipe as possible
 
-        unless(defined $n) {
+            my $buf = '';
+            my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size);
 
-            if ($sig_pipe) {
-                $logger->info("server: $self got SIGPIPE reading data from parent, retrying...");
-                usleep(50000); # 50 msec
-                next;
+            unless(defined $bytes_read) {
+                $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+                last;
             }
 
-            $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
-            last;
+            last if $bytes_read <= 0; # no more data available for reading
+
+            $total_read += $bytes_read;
+            $data .= $buf;
         }
 
-        last if $n <= 0; # no data left to read
+        # we've read all the data currently available on the pipe.
+        # let's see if we're done.
+
+        if ($first_read) {
+            # extract the data size and remove the size header
+            my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
+            $data_size = int(substr($data, 0, $wps_size)) + $wps_size;
+            $data = substr($data, $wps_size);
+            $first_read = 0;
+        }
 
-        $data .= $buf;
+        $self->set_block($self->{pipe_to_parent});
 
-        last if $n < $read_size; # done reading all data
+        if ($total_read == $data_size) {
+            # we've read all the data. Nothing left to do
+            last;
+        }
     }
 
-    # return to blocking mode
-    $self->set_block($self->{pipe_to_parent});
     return $data;
 }
 
diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm
index a720d26..7fd7195 100644
--- a/src/perl/lib/OpenSRF/System.pm
+++ b/src/perl/lib/OpenSRF/System.pm
@@ -99,8 +99,7 @@ sub run_service {
         min_children =>  $getval->(unix_config => 'min_children') || 1,
         min_spare_children =>  $getval->(unix_config => 'min_spare_children'),
         max_spare_children =>  $getval->(unix_config => 'max_spare_children'),
-        stderr_log_path => $stderr_path,
-        lock_file_path => $pid_dir
+        stderr_log_path => $stderr_path
     );
 
     while(1) {

-----------------------------------------------------------------------

Summary of changes:
 src/perl/lib/OpenSRF/Server.pm |  132 +++++++++++++++++-----------------------
 src/perl/lib/OpenSRF/System.pm |    3 +-
 2 files changed, 58 insertions(+), 77 deletions(-)


hooks/post-receive
-- 
OpenSRF


More information about the opensrf-commits mailing list