[Opensrf-commits] r1296 - in trunk/src/perlmods/OpenSRF: . Transport/SlimJabber

svn at svn.open-ils.org svn at svn.open-ils.org
Mon Mar 31 17:23:45 EDT 2008


Author: erickson
Date: 2008-03-31 16:47:41 -0400 (Mon, 31 Mar 2008)
New Revision: 1296

Added:
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm
Modified:
   trunk/src/perlmods/OpenSRF/Transport.pm
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
   trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm
   trunk/src/perlmods/OpenSRF/UnixServer.pm
Log:

This patch replaces the regex-based XML stream parsing mechanism with an XML::Parser (expat) based parser.
The API and parsing behavior should behave identically
This requires a new OpenSRF Perl dependency -> FreezeThaw



Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -1,595 +1,126 @@
 package OpenSRF::Transport::SlimJabber::Client;
 use strict; use warnings;
 use OpenSRF::EX;
-use base qw( OpenSRF );
-use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::Utils::Config;
-use Time::HiRes qw(ualarm);
-use OpenSRF::Utils::Config;
-
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use IO::Socket::INET;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::SlimJabber::XMPPReader;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use IO::Socket::UNIX;
+use FreezeThaw qw/freeze/;
 
-=head1 Description
-
-OpenSRF::Transport::SlimJabber::Client
-
-Home-brewed slimmed down jabber connection agent. Supports SSL connections
-with a config file options:
-
-  transport->server->sslport # the ssl port
-  transport->server->ssl  # is this ssl?
-
-=cut
-
-my $logger = "OpenSRF::Utils::Logger";
-
 sub DESTROY{
-	my $self = shift;
-	$self->disconnect;
+    shift()->disconnect;
 }
 
-sub disconnect{
-	my $self = shift;
-	my $socket = $self->{_socket};
-	if( $socket and $socket->connected() ) {
-		print $socket "</stream:stream>";
-		close( $socket );
-	}
-}
-
-
-=head2 new()
-
-Creates a new Client object.
-
-debug and log_file are not required if you don't care to log the activity, 
-however all other parameters are.
-
-%params:
-
-	username
-	resource	
-	password
-	debug	 
-	log_file
-
-=cut
-
 sub new {
-
 	my( $class, %params ) = @_;
-
-	$class = ref( $class ) || $class;
-
-	my $port			= $params{'port'}			|| return undef;
-	my $username	= $params{'username'}	|| return undef;
-	my $resource	= $params{'resource'}	|| return undef;
-	my $password	= $params{'password'}	|| return undef;
-	my $host			= $params{'host'}			|| return undef;
-
-	my $jid = "$username\@$host\/$resource";
-
-	my $self = bless {} => $class;
-
-	$self->jid( $jid );
-	$self->host( $host );
-	$self->port( $port );
-	$self->username( $username );
-	$self->resource( $resource );
-	$self->password( $password );
-	$self->{temp_buffer} = "";
-
-	$logger->transport( "Creating Client instance: $host:$port, $username, $resource",
-			$logger->INFO );
-
+    my $self = bless({}, ref($class) || $class);
+    $self->params(\%params);
 	return $self;
 }
 
-# clears the tmp buffer as well as the TCP buffer
-sub buffer_reset { 
 
-	my $self = shift;
-	$self->{temp_buffer} = ""; 
-
-	my $fh = $self->{_socket};
-	set_nonblock( $fh );
-	my $t_buf = "";
-	while( sysread( $fh, $t_buf, 4096 ) ) {} 
-	set_block( $fh );
+sub reader {
+    my($self, $reader) = @_;
+    $self->{reader} = $reader if $reader;
+    return $self->{reader};
 }
-# -------------------------------------------------
 
-=head2 gather()
-
-Gathers all Jabber messages sitting in the collection queue 
-and hands them each to their respective callbacks.  This call
-does not block (calls Process(0))
-
-=cut
-
-sub gather { my $self = shift; $self->process( 0 ); }
-
-# -------------------------------------------------
-
-=head2 listen()
-
-Blocks and gathers incoming messages as they arrive.  Does not return
-unless an error occurs.
-
-Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
-
-=cut
-sub listen {
-	my $self = shift;
-
-	my $sock = $self->unix_sock();
-	my $socket = IO::Socket::UNIX->new( Peer => $sock  );
-	$logger->transport( "Unix Socket opened by Listener", INTERNAL );
-	
-	throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
-		unless ($socket->connected);
-		
-	while(1) {
-		my $o = $self->process( -1 );
-		$logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
-		if( ! defined( $o ) ) {
-			throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
-		}
-		print $socket $o;
-
-	}
-	throw OpenSRF::EX::Socket( "How did we get here?!?!" );
+sub params {
+    my($self, $params) = @_;
+    $self->{params} = $params if $params;
+    return $self->{params};
 }
 
-sub set_nonblock {
-	my $fh = shift;
-	my	$flags = fcntl($fh, F_GETFL, 0)
-		or die "Can't get flags for the socket: $!\n";
-
-	$logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
-
-	fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
-		or die "Can't set flags for the socket: $!\n";
-
-	return $flags;
+sub socket {
+    my($self, $socket) = @_;
+    $self->{socket} = $socket if $socket;
+    return $self->{socket};
 }
 
-sub reset_fl {
-	my $fh = shift;
-	my $flags = shift;
-	$logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
-	fcntl($fh, F_SETFL, $flags) if defined $flags;
+sub disconnect {
+    my $self = shift;
+	$self->reader->disconnect if $self->reader;
 }
 
-sub set_block {
-	my $fh = shift;
 
-	my	$flags = fcntl($fh, F_GETFL, 0)
-		or die "Can't get flags for the socket: $!\n";
-
-	$flags &= ~O_NONBLOCK;
-
-	fcntl($fh, F_SETFL, $flags)
-		or die "Can't set flags for the socket: $!\n";
+sub gather { 
+    my $self = shift; 
+    $self->process( 0 ); 
 }
 
-
-sub timed_read {
-	my ($self, $timeout) = @_;
-    $timeout = defined($timeout) ? int($timeout) : undef;
-
-	$logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
-	if( $self->can( "app" ) ) {
-		$logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
-	}
-
-	# See if there is a complete message in the temp_buffer
-	# that we can return
-	if( $self->{temp_buffer} ) {
-		my $buffer = $self->{temp_buffer};
-		my $complete = 0;
-		$self->{temp_buffer} = '';
-
-		my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
-		$self->{last_tag} = $tag;
-		$logger->transport("Using tag: $tag  ", INTERNAL);
-
-		if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
-			$buffer = $1;
-			$self->{temp_buffer} = $2;
-			$complete++;
-			$logger->transport( "completed read with $buffer", INTERNAL );
-		} elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
-			$self->{temp_buffer} = $1;
-			$complete++;
-			$logger->transport( "completed read with $buffer", INTERNAL );
-		} else {
-			$self->{temp_buffer} = $buffer;
-		}
-				
-		if( $buffer and $complete ) {
-			return $buffer;
-		}
-
-	}
-	############
-
-	my $fh = $self->{_socket};
-
-	unless( $fh and $fh->connected ) {
-		throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
-	}
-
-	$logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
-
-	my $flags;
-	if (defined($timeout) && !$timeout) {
-		$flags = set_nonblock( $fh );
-	}
-
-	$timeout ||= 0;
-	$logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
-
-
-	my $complete = 0;
-	my $first_read = 1;
-	my $xml = '';
-	eval {
-		my $tag = '';
-		eval {
-			no warnings;
-			local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
-
-			# alarm needs a number greater => 1.
-			my $alarm_timeout = $timeout;
-			if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
-				$alarm_timeout = 1;
-			}
-			alarm $alarm_timeout;
-			do {	
-
-				my $buffer = $self->{temp_buffer};
-				$self->{temp_buffer} = '';
-				#####
-
-				# This code is no longer in use
-				#my $ff =  fcntl($fh, F_GETFL, 0);
-				#if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
-					#throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
-				#}
-
-				my $t_buf = "";
-				my $read_size = 1024; my $f = 0;
-				while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
-
-					unless( $fh->connected ) {
-						OpenSRF::EX::JabberDisconnected->throw(
-							"Lost jabber client in timed_read()");
-					}
-
-					# XXX Change me to debug/internal at some point, this is for testing...
-					# XXX Found a race condition where reading >= $read_size bytes of data
-					# will fail if the log line below is removed.
-					$logger->info("timed_read() read $n bytes of data");
-
-
-					$buffer .= $t_buf;
-					if( $n < $read_size ) {
-						#reset_fl( $fh, $f ) if $f;
-						set_block( $fh );
-						last;
-					}
-					# see if there is any more data to grab...
-					$f = set_nonblock( $fh );
-				}
-
-				#sysread($fh, $buffer, 2048, length($buffer) );
-				#sysread( $fh, $t_buf, 2048 );
-				#$buffer .= $t_buf;
-
-				#####
-				$logger->transport(" Got [$buffer] from the socket", INTERNAL);
-
-				if ($first_read) {
-					$logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
-					($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
-					$self->{last_tag} = $tag;
-					$first_read--;
-					$logger->transport("Using tag: $tag  ", INTERNAL);
-				}
-
-				if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
-					$buffer = $1;
-					$self->{temp_buffer} = $2;
-					$complete++;
-					$logger->transport( "completed read with $buffer", INTERNAL );
-				} elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
-					$self->{temp_buffer} = $1;
-					$complete++;
-					$logger->transport( "completed read with $buffer", INTERNAL );
-				}
-				
-				$xml .= $buffer;
-
-			} while (!$complete && $xml);
-			alarm(0);
-		};
-		alarm(0);
-	};
-
-	$logger->transport( "XML Read: $xml", INTERNAL );
-	#reset_fl( $fh, $flags) if defined $flags;
-	set_block( $fh ) if defined $flags;
-
-	if ($complete) {
-		return $xml;
-	}
-	if( $@ ) {
-		return undef;
-	}
-	return "";
-}
-
-
 # -------------------------------------------------
 
 sub tcp_connected {
-
 	my $self = shift;
-	return 1 if ($self->{_socket} and $self->{_socket}->connected);
-	return 0;
+    return $self->reader->tcp_connected if $self->reader;
+    return 0;
 }
 
-sub password {
-	my( $self, $password ) = @_;
-	$self->{'oils:password'} = $password if $password;
-	return $self->{'oils:password'};
-}
 
-# -------------------------------------------------
 
-sub username {
-	my( $self, $username ) = @_;
-	$self->{'oils:username'} = $username if $username;
-	return $self->{'oils:username'};
-}
-	
-# -------------------------------------------------
-
-sub resource {
-	my( $self, $resource ) = @_;
-	$self->{'oils:resource'} = $resource if $resource;
-	return $self->{'oils:resource'};
-}
-
-# -------------------------------------------------
-
-sub jid {
-	my( $self, $jid ) = @_;
-	$self->{'oils:jid'} = $jid if $jid;
-	return $self->{'oils:jid'};
-}
-
-sub port {
-	my( $self, $port ) = @_;
-	$self->{'oils:port'} = $port if $port;
-	return $self->{'oils:port'};
-}
-
-sub host {
-	my( $self, $host ) = @_;
-	$self->{'oils:host'} = $host if $host;
-	return $self->{'oils:host'};
-}
-
-# -------------------------------------------------
-
-=head2 send()
-
-	Sends a Jabber message.
-	
-	%params:
-		to			- The JID of the recipient
-		thread	- The Jabber thread
-		body		- The body of the message
-
-=cut
-
 sub send {
 	my $self = shift;
-	my %params = @_;
-
-	my $to = $params{'to'} || return undef;
-	my $body = $params{'body'} || return undef;
-	my $thread = $params{'thread'} || "";
-	my $router_command = $params{'router_command'} || "";
-	my $router_class = $params{'router_class'} || "";
-
-	my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
-
-	$msg->setTo( $to );
-	$msg->setThread( $thread ) if $thread;
-	$msg->setBody( $body );
-	$msg->set_router_command( $router_command );
-	$msg->set_router_class( $router_class );
-    $msg->set_osrf_xid($logger->get_osrf_xid);
-
-	$logger->transport( 
-			"JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
-
-	my $soc = $self->{_socket};
-	unless( $soc and $soc->connected ) {
-		throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
-	}
-	print $soc $msg->toString;
-
-	$logger->transport( 
-			"JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
+    my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_);
+    $self->reader->send($msg->to_xml);
 }
 
-
-=head2 inintialize()
-
-Connect to the server and log in.  
-
-Throws an OpenSRF::EX::JabberException if we cannot connect
-to the server or if the authentication fails.
-
-=cut
-
-# --- The logging lines have been commented out until we decide 
-# on which log files we're using.
-
 sub initialize {
 
 	my $self = shift;
 
-	my $jid		= $self->jid; 
-	my $host	= $self->host; 
-	my $port	= $self->port; 
-	my $username	= $self->username;
-	my $resource	= $self->resource;
-	my $password	= $self->password;
+	my $host	= $self->params->{host}; 
+	my $port	= $self->params->{port}; 
+	my $username	= $self->params->{username};
+	my $resource	= $self->params->{resource};
+	my $password	= $self->params->{password};
 
-	my $stream = <<"	XML";
-<stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
-	XML
+    my $jid = "$username\@$host/$resource";
 
 	my $conf = OpenSRF::Utils::Config->current;
+
 	my $tail = "_$$";
-	if(!$conf->bootstrap->router_name && $username eq "router") {
-		$tail = "";
-	}
+	$tail = "" if !$conf->bootstrap->router_name and $username eq "router";
+    $resource = "$resource$tail";
 
-	my $auth = <<"	XML";
-<iq id='123' type='set'>
-<query xmlns='jabber:iq:auth'>
-<username>$username</username>
-<password>$password</password>
-<resource>${resource}$tail</resource>
-</query>
-</iq>
-	XML
+    my $socket = IO::Socket::INET->new(
+        PeerHost => $host,
+        PeerPort => $port,
+        Peer => $port,
+        Proto  => 'tcp' );
 
-	my $sock_type = 'IO::Socket::INET';
-	
-	# if port is a string, then we're connecting to a UNIX socket
-	unless( $port =~ /^\d+$/ ) {
-		$sock_type = 'IO::Socket::UNIX';
-	}
+    throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $!")
+	    unless ( $socket and $socket->connected );
 
-	# --- 5 tries to connect to the jabber server
-	my $socket;
-	for(1..5) {
-		$socket = $sock_type->new( PeerHost => $host,
-					   PeerPort => $port,
-					   Peer => $port,
-					   Proto    => 'tcp' );
-		$logger->debug( "$jid: $_ connect attempt to $host:$port");
-		last if ( $socket and $socket->connected );
-		$logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
-		sleep 3;
-	}
+    $self->socket($socket);
+    $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket));
+    $self->reader->connect($host, $username, $password, $resource);
 
-	unless ( $socket and $socket->connected ) {
-		throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
-	}
+    throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $!")
+	    unless ( $self->reader->connected );
 
-	$logger->transport( "Logging into jabber as $jid " .
-			"from " . ref( $self ), DEBUG );
-
-	print $socket $stream;
-
-	my $buffer;
-	eval {
-		eval {
-			local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
-			alarm 3;
-			sysread($socket, $buffer, 4096);
-			$logger->transport( "Login buffer 1: $buffer", INTERNAL );
-			alarm(0);
-		};
-		alarm(0);
-	};
-
-	print $socket $auth;
-
-	if( $socket and $socket->connected() ) {
-		$self->{_socket} = $socket;
-	} else {
-		throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
-	}
-
-
-	$buffer = $self->timed_read(10);
-
-	if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
-
-	if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
-		$logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
-	} else {
-		if( !$buffer ) { $buffer = " "; }
-		$socket->close;
-		throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
-	}
-
 	return $self;
 }
 
+
 sub construct {
 	my( $class, $app ) = @_;
-	$logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
-	$class->peer_handle( 
-			$class->new( $app )->initialize() );
+	$class->peer_handle($class->new( $app )->initialize());
 }
 
+
 sub process {
+	my($self, $timeout) = @_;
 
-	my( $self, $timeout ) = @_;
-
 	$timeout ||= 0;
     $timeout = int($timeout);
-	undef $timeout if ( $timeout < 0 );
 
-	unless( $self->{_socket}->connected ) {
-		OpenSRF::EX::JabberDisconnected->throw( 
-		  "This JabberClient instance is no longer connected to the server " . 
-		  $self->username . " : " . $self->resource, ERROR );
+	unless( $self->reader and $self->reader->connected ) {
+        throw OpenSRF::EX::JabberDisconnected 
+            ("This JabberClient instance is no longer connected to the server ");
 	}
 
-	my $val = $self->timed_read( $timeout );
-
-	$timeout = "FOREVER" unless ( defined $timeout );
-	
-	if ( ! defined( $val ) ) {
-		OpenSRF::EX::Jabber->throw( 
-		  "Call to Client->timed_read( $timeout ) failed", ERROR );
-	} elsif ( ! $val ) {
-		$logger->transport( 
-			"Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
-	} elsif ( $val ) {
-		$logger->transport( 
-			"Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
-	}
-
-	my $t = $self->{last_tag};
-
-	if( $t and $val ) {
-		my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
-		$val = shift(@msgs);
-	
-		if (@msgs) {
-			my $tmp = $self->{temp_buffer};
-	
-			$self->{temp_buffer} = '';
-			$self->{temp_buffer} .= $_ for (@msgs);
-			$self->{temp_buffer} .= $tmp;
-		}
-	}
-
-	return $val;
+    return $self->reader->wait_msg($timeout);
 }
 
 
@@ -599,34 +130,10 @@
 # Returns 1 on success, 0 if the socket isn't connected
 # --------------------------------------------------------------
 sub flush_socket {
-
 	my $self = shift;
-	my $socket = $self->{_socket};
-
-	if( $socket ) {
-
-		my $buf;
-		my	$flags = fcntl($socket, F_GETFL, 0);
-
-		fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
-		while( my $n = sysread( $socket, $buf, 8192 ) ) {
-			$logger->debug("flush_socket dropped $n bytes of data");
-			if(!$socket->connected()) {
-				$logger->error("flush_socket dropped data on disconnected socket: $buf");
-			}
-		}
-		fcntl($socket, F_SETFL, $flags);
-
-		return 0 unless $socket->connected();
-
-		return 1;
-
-	} else {
-
-		return 0;
-	}
+    return $self->reader->flush_socket;
 }
 
+1;
 
 
-1;

Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -6,6 +6,7 @@
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Config;
 use Time::HiRes qw/usleep/;
+use FreezeThaw qw/freeze/;
 
 my $logger = "OpenSRF::Utils::Logger";
 
@@ -20,9 +21,6 @@
 
 =cut
 
-# XXX This will be overhauled to connect as a component instead of as
-# a user.  all in good time, though.
-
 {
 	my $unix_sock;
 	sub unix_sock { return $unix_sock; }
@@ -98,7 +96,6 @@
         $logger->info("loading router info $routers");
 
         for my $router (@$routers) {
-
             if(ref $router) {
                 if( !$router->{services} || grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) {
                     my $name = $router->{name};
@@ -134,10 +131,8 @@
 		$logger->debug("Inbound listener calling process()");
 
 		try {
-			$o = $self->process( -1 );
+			$o = $self->process(-1);
 
-			$logger->debug("Inbound listener received ".length($o)." bytes of data");
-
 			if(!$o){
 				$logger->error(
 					"Inbound received no data from the Jabber socket in process()");
@@ -158,7 +153,7 @@
 			throw OpenSRF::EX::Socket( 
 				"Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
 				unless ($socket->connected);
-			print $socket $o;
+			print $socket freeze($o);
 			$socket->close;
 		} 
 	}

Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -1,125 +1,72 @@
 package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use XML::LibXML;
-use OpenSRF::EX qw/:try/;
-use OpenSRF::Utils::Logger qw/$logger/;
+use strict; use warnings;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
 
+# ----------------------------------------------------------
+# Legacy wrapper for XMPPMessage
+# ----------------------------------------------------------
+
 sub new {
 	my $class = shift;
-	$class = ref($class) || $class;
+    my $msg = shift;
+    return bless({msg => $msg}, ref($class) || $class);
+}
 
-	my $xml = shift;
-
-	my ($doc, $msg);
-	if ($xml) {
-		my $err;
-
-		try {
-			$doc = XML::LibXML->new->parse_string($xml);
-		} catch Error with {
-			$err = shift; 
-			warn "MessageWrapper received bad XML : error = $err\nXML = $xml\n";
-			$logger->error("MessageWrapper received bad XML : error = $err : XML = $xml");
-		};
-		throw $err if $err;
-
-		$msg = $doc->documentElement;
-	} else {
-		$doc = XML::LibXML::Document->createDocument;
-		$msg = $doc->createElement( 'message' );
-		$doc->setDocumentElement( $msg );
-	}
-
-	
-	my $self = { msg_node => $msg };
-
-	return bless $self => $class;
+sub msg {
+    my($self, $msg) = @_;
+    $self->{msg} = $msg if $msg;
+    return $self->{msg};
 }
 
 sub toString {
-	my $self = shift;
-	if( $self->{msg_node} ) {
-		return $self->{msg_node}->toString(@_);
-	}
-	return "";
+    return $_[0]->msg->to_xml;
 }
 
 sub get_body {
-	my $self = shift;
-	my ($t_body) = grep {$_->nodeName eq 'body'} $self->{msg_node}->childNodes;
-	if( $t_body ) {
-		my $body = $t_body->textContent;
-		return $body;
-	}
-	return "";
+    return $_[0]->msg->body;
 }
 
 sub get_sess_id {
-	my $self = shift;
-	my ($t_node) = grep {$_->nodeName eq 'thread'} $self->{msg_node}->childNodes;
-	if( $t_node ) {
-		return $t_node->textContent;
-	}
-	return "";
+    return $_[0]->msg->thread;
 }
 
 sub get_msg_type {
-	my $self = shift;
-	$self->{msg_node}->getAttribute( 'type' );
+    return $_[0]->msg->type;
 }
 
 sub get_remote_id {
-	my $self = shift;
-
-	#
-	my $rid = $self->{msg_node}->getAttribute( 'router_from' );
-	return $rid if $rid;
-
-	return $self->{msg_node}->getAttribute( 'from' );
+    return $_[0]->msg->from;
 }
 
 sub setType {
-	my $self = shift;
-	$self->{msg_node}->setAttribute( type => shift );
+    $_[0]->msg->type(shift());
 }
 
 sub setTo {
-	my $self = shift;
-	$self->{msg_node}->setAttribute( to => shift );
+    $_[0]->msg->to(shift());
 }
 
 sub setThread {
-	my $self = shift;
-	$self->{msg_node}->appendTextChild( thread => shift );
+    $_[0]->msg->thread(shift());
 }
 
 sub setBody {
-	my $self = shift;
-	my $body = shift;
-	$self->{msg_node}->appendTextChild( body => $body );
+    $_[0]->msg->body(shift());
 }
 
 sub set_router_command {
-	my( $self, $router_command ) = @_;
-	if( $router_command ) {
-		$self->{msg_node}->setAttribute( router_command => $router_command );
-	}
+    $_[0]->msg->router_command(shift());
 }
 sub set_router_class {
-	my( $self, $router_class ) = @_;
-	if( $router_class ) {
-		$self->{msg_node}->setAttribute( router_class => $router_class );
-	}
+    $_[0]->msg->router_class(shift());
 }
 
 sub set_osrf_xid {
-   my( $self, $xid ) = @_;
-   $self->{msg_node}->setAttribute( osrf_xid => $xid );
+    $_[0]->msg->osrf_xid(shift());
 }
 
-
 sub get_osrf_xid {
-   my $self = shift;
-   $self->{msg_node}->getAttribute('osrf_xid');
+   return $_[0]->msg->osrf_xid;
 }
 
 1;

Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -29,15 +29,9 @@
 sub retrieve { 
 	my( $class, $app ) = @_;
 	return $_singleton_connection;
-#	my @keys = keys %apps_hash;
-#	OpenSRF::Utils::Logger->transport( 
-#			"Requesting peer for $app and we have @keys", INFO );
-#	return $apps_hash{$app};
 }
 
 
-
-# !! In here we use the bootstrap config ....
 sub new {
 	my( $class, $app ) = @_;
 
@@ -63,12 +57,6 @@
 
 	if( $app eq "client" ) { $resource = "client_at_$h"; }
 
-#	unless ( $conf->bootstrap->router_name ) {
-#		$username = 'router';
-#		$resource = $app;
-#	}
-
-
 	OpenSRF::EX::Config->throw( "JPeer could not load all necesarry values from config" )
 		unless ( $username and $password and $resource and $host and $port );
 
@@ -94,26 +82,16 @@
 }
 
 sub process {
-
 	my $self = shift;
 	my $val = $self->SUPER::process(@_);
 	return 0 unless $val;
-
-	OpenSRF::Utils::Logger->transport( "Calling transport handler for ".$self->app." with: $val", INTERNAL );
-	my $t;
-	$t = OpenSRF::Transport->handler($self->app, $val);
-
-	return $t;
+	return OpenSRF::Transport->handler($self->app, $val);
 }
 
 sub app {
 	my $self = shift;
 	my $app = shift;
-	if( $app ) {
-		OpenSRF::Utils::Logger->transport( "PEER changing app to $app: ".$self->jid, INTERNAL );
-	}
-
-	$self->{app} = $app if ($app);
+	$self->{app} = $app if $app;
 	return $self->{app};
 }
 

Added: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm	                        (rev 0)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -0,0 +1,134 @@
+package OpenSRF::Transport::SlimJabber::XMPPMessage;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+use XML::LibXML;
+
+use constant JABBER_MESSAGE =>
+    "<message to='%s' from='%s' router_command='%s' router_class='%s' osrf_xid='%s'>".
+    "<thread>%s</thread><body>%s</body></message>";
+
+sub new {
+    my $class = shift;
+    my %args = @_;
+    my $self = bless({}, $class);
+
+    if($args{xml}) {
+        $self->parse_xml($args{xml});
+
+    } else {
+        $self->{to} = $args{to} || '';
+        $self->{from} = $args{from} || '';
+        $self->{thread} = $args{thread} || '';
+        $self->{body} = $args{body} || '';
+        $self->{osrf_xid} = $args{osrf_xid} || '';
+        $self->{router_command} = $args{router_command} || '';
+        $self->{router_class} = $args{router_class} || '';
+    }
+
+    return $self;
+}
+
+sub to {
+    my($self, $to) = @_;
+    $self->{to} = $to if defined $to;
+    return $self->{to};
+}
+sub from {
+    my($self, $from) = @_;
+    $self->{from} = $from if defined $from;
+    return $self->{from};
+}
+sub thread {
+    my($self, $thread) = @_;
+    $self->{thread} = $thread if defined $thread;
+    return $self->{thread};
+}
+sub body {
+    my($self, $body) = @_;
+    $self->{body} = $body if defined $body;
+    return $self->{body};
+}
+sub status {
+    my($self, $status) = @_;
+    $self->{status} = $status if defined $status;
+    return $self->{status};
+}
+sub type {
+    my($self, $type) = @_;
+    $self->{type} = $type if defined $type;
+    return $self->{type};
+}
+sub err_type {
+    my($self, $err_type) = @_;
+    $self->{err_type} = $err_type if defined $err_type;
+    return $self->{err_type};
+}
+sub err_code {
+    my($self, $err_code) = @_;
+    $self->{err_code} = $err_code if defined $err_code;
+    return $self->{err_code};
+}
+sub osrf_xid {
+    my($self, $osrf_xid) = @_;
+    $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+    return $self->{osrf_xid};
+}
+sub router_command {
+    my($self, $router_command) = @_;
+    $self->{router_command} = $router_command if defined $router_command;
+    return $self->{router_command};
+}
+sub router_class {
+    my($self, $router_class) = @_;
+    $self->{router_class} = $router_class if defined $router_class;
+    return $self->{router_class};
+}
+
+
+sub to_xml {
+    my $self = shift;
+
+    my $body = $self->{body};
+    $body =~ s/&/&amp;/sog;
+    $body =~ s/</&lt;/sog;
+    $body =~ s/>/&gt;/sog;
+
+    return sprintf(
+        JABBER_MESSAGE,
+        $self->{to},
+        $self->{from},
+        $self->{router_command},
+        $self->{router_class},
+        $self->{osrf_xid},
+        $self->{thread},
+        $body
+    );
+}
+
+sub parse_xml {
+    my($self, $xml) = @_;
+    my($doc, $err);
+
+    try {
+        $doc = XML::LibXML->new->parse_string($xml);
+    } catch Error with {
+        my $err = shift;
+        $logger->error("Error parsing message xml: $xml --- $err");
+    };
+    throw $err if $err;
+
+    my $root = $doc->documentElement;
+
+    $self->{body} = $root->findnodes('/message/body').'';
+    $self->{thread} = $root->findnodes('/message/thread').'';
+    $self->{from} = $root->getAttribute('router_from');
+    $self->{from} = $root->getAttribute('from') unless $self->{from};
+    $self->{to} = $root->getAttribute('to');
+    $self->{type} = $root->getAttribute('type');
+    $self->{osrf_xid} = $root->getAttribute('osrf_xid');
+}
+
+
+1;

Added: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm	                        (rev 0)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -0,0 +1,350 @@
+package OpenSRF::Transport::SlimJabber::XMPPReader;
+use strict; use warnings;
+use XML::Parser;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw/time/;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Utils::Logger qw/$logger/;
+
+# -----------------------------------------------------------
+# Connect, disconnect, and authentication messsage templates
+# -----------------------------------------------------------
+use constant JABBER_CONNECT =>
+    "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
+
+use constant JABBER_BASIC_AUTH =>
+    "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
+    "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
+
+use constant JABBER_DISCONNECT => "</stream:stream>";
+
+
+# -----------------------------------------------------------
+# XMPP Stream states
+# -----------------------------------------------------------
+use constant DISCONNECTED   => 1;
+use constant CONNECT_RECV   => 2;
+use constant CONNECTED      => 3;
+
+
+# -----------------------------------------------------------
+# XMPP Message states
+# -----------------------------------------------------------
+use constant IN_NOTHING => 1;
+use constant IN_BODY    => 2;
+use constant IN_THREAD  => 3;
+use constant IN_STATUS  => 4;
+
+
+# -----------------------------------------------------------
+# Constructor, getter/setters
+# -----------------------------------------------------------
+sub new {
+    my $class = shift;
+    my $socket = shift;
+
+    my $self = bless({}, $class);
+
+    $self->{queue} = [];
+    $self->{stream_state} = DISCONNECTED;
+    $self->{xml_state} = IN_NOTHING;
+    $self->socket($socket);
+
+    my $p = new XML::Parser(Handlers => {
+        Start => \&start_element,
+        End   => \&end_element,
+        Char  => \&characters,
+    });
+
+    $self->parser($p->parse_start); # create a push parser
+    $self->parser->{_parent_} = $self;
+    $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+    return $self;
+}
+
+sub push_msg {
+    my($self, $msg) = @_; 
+    push(@{$self->{queue}}, $msg) if $msg;
+}
+
+sub next_msg {
+    my $self = shift;
+    return shift @{$self->{queue}};
+}
+
+sub peek_msg {
+    my $self = shift;
+    return (@{$self->{queue}} > 0);
+}
+
+sub parser {
+    my($self, $parser) = @_;
+    $self->{parser} = $parser if $parser;
+    return $self->{parser};
+}
+
+sub socket {
+    my($self, $socket) = @_;
+    $self->{socket} = $socket if $socket;
+    return $self->{socket};
+}
+
+sub stream_state {
+    my($self, $stream_state) = @_;
+    $self->{stream_state} = $stream_state if $stream_state;
+    return $self->{stream_state};
+}
+
+sub xml_state {
+    my($self, $xml_state) = @_;
+    $self->{xml_state} = $xml_state if $xml_state;
+    return $self->{xml_state};
+}
+
+sub message {
+    my($self, $message) = @_;
+    $self->{message} = $message if $message;
+    return $self->{message};
+}
+
+
+# -----------------------------------------------------------
+# Stream and connection handling methods
+# -----------------------------------------------------------
+
+sub connect {
+    my($self, $domain, $username, $password, $resource) = @_;
+    
+    $self->send(sprintf(JABBER_CONNECT, $domain));
+    $self->wait(10);
+
+    unless($self->{stream_state} == CONNECT_RECV) {
+        $logger->error("No initial XMPP response from server");
+        return 0;
+    }
+
+    $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
+    $self->wait(10);
+
+    unless($self->connected) {
+        $logger->error('XMPP connect failed');
+        return 0;
+    }
+
+    return 1;
+}
+
+sub disconnect {
+    my $self = shift;
+    $self->send(JABBER_DISCONNECT); 
+    shutdown($self->socket, 2);
+    close($self->socket);
+}
+
+# -----------------------------------------------------------
+# returns true if this stream is connected to the server
+# -----------------------------------------------------------
+sub connected {
+    my $self = shift;
+    return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
+}
+
+# -----------------------------------------------------------
+# returns true if the socket is connected
+# -----------------------------------------------------------
+sub tcp_connected {
+    my $self = shift;
+    return ($self->socket and $self->socket->connected);
+}
+
+# -----------------------------------------------------------
+# sends pre-formated XML
+# -----------------------------------------------------------
+sub send {
+    my($self, $xml) = @_;
+    $self->{socket}->print($xml);
+}
+
+# -----------------------------------------------------------
+# Puts a file handle into blocking mode
+# -----------------------------------------------------------
+sub set_block {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    $flags &= ~O_NONBLOCK;
+    fcntl($fh, F_SETFL, $flags);
+}
+
+
+# -----------------------------------------------------------
+# Puts a file handle into non-blocking mode
+# -----------------------------------------------------------
+sub set_nonblock {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+
+sub wait {
+    my($self, $timeout) = @_;
+     
+    return $self->next_msg if $self->peek_msg;
+
+    $timeout ||= 0;
+    $timeout = undef if $timeout < 0;
+    my $socket = $self->{socket};
+
+    set_block($socket);
+    
+    # build the select readset
+    my $infile = '';
+    vec($infile, $socket->fileno, 1) = 1;
+    return undef unless select($infile, undef, undef, $timeout);
+
+    # now slurp the data off the socket
+    my $buf;
+    my $read_size = 1024;
+    while(my $n = sysread($socket, $buf, $read_size)) {
+        $self->{parser}->parse_more($buf) if $buf;
+        if($n < $read_size or $self->peek_msg) {
+            set_block($socket);
+            last;
+        }
+        set_nonblock($socket);
+    }
+
+    return $self->next_msg;
+}
+
+# -----------------------------------------------------------
+# Waits up to timeout seconds for a fully-formed XMPP
+# message to arrive.  If timeout is < 0, waits indefinitely
+# -----------------------------------------------------------
+sub wait_msg {
+    my($self, $timeout) = @_;
+    my $xml;
+
+    $timeout = 0 unless defined $timeout;
+
+    if($timeout < 0) {
+        while(1) {
+            return $xml if $xml = $self->wait($timeout); 
+        }
+
+    } else {
+        while($timeout >= 0) {
+            my $start = time;
+            return $xml if $xml = $self->wait($timeout); 
+            $timeout -= time - $start;
+        }
+    }
+
+    return undef;
+}
+
+
+# -----------------------------------------------------------
+# SAX Handlers
+# -----------------------------------------------------------
+
+
+sub start_element {
+    my($parser, $name, %attrs) = @_;
+    my $self = $parser->{_parent_};
+
+    if($name eq 'message') {
+
+        my $msg = $self->{message};
+        $msg->{to} = $attrs{'to'};
+        $msg->{from} = $attrs{router_from} if $attrs{router_from};
+        $msg->{from} = $attrs{from} unless $msg->{from};
+        $msg->{osrf_xid} = $attrs{'osrf_xid'};
+        $msg->{type} = $attrs{type};
+
+    } elsif($name eq 'body') {
+        $self->{xml_state} = IN_BODY;
+
+    } elsif($name eq 'thread') {
+        $self->{xml_state} = IN_THREAD;
+
+    } elsif($name eq 'stream:stream') {
+        $self->{stream_state} = CONNECT_RECV;
+
+    } elsif($name eq 'iq') {
+        if($attrs{type} and $attrs{type} eq 'result') {
+            $self->{stream_state} = CONNECTED;
+        }
+
+    } elsif($name eq 'status') {
+        $self->{xml_state } = IN_STATUS;
+
+    } elsif($name eq 'stream:error') {
+        $self->{stream_state} = DISCONNECTED;
+
+    } elsif($name eq 'error') {
+        $self->{message}->{err_type} = $attrs{'type'};
+        $self->{message}->{err_code} = $attrs{'code'};
+        $self->{stream_state} = DISCONNECTED;
+    }
+}
+
+sub characters {
+    my($parser, $chars) = @_;
+    my $self = $parser->{_parent_};
+    my $state = $self->{xml_state};
+
+    if($state == IN_BODY) {
+        $self->{message}->{body} .= $chars;
+
+    } elsif($state == IN_THREAD) {
+        $self->{message}->{thread} .= $chars;
+
+    } elsif($state == IN_STATUS) {
+        $self->{message}->{status} .= $chars;
+    }
+}
+
+sub end_element {
+    my($parser, $name) = @_;
+    my $self = $parser->{_parent_};
+    $self->{xml_state} = IN_NOTHING;
+
+    if($name eq 'message') {
+        $self->push_msg($self->{message});
+        $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+
+    } elsif($name eq 'stream:stream') {
+        $self->{stream_state} = DISCONNECTED;
+    }
+}
+
+sub flush_socket {
+	my $self = shift;
+	my $socket = $self->socket;
+    return 0 unless $socket and $socket->connected;
+
+    my $flags = fcntl($socket, F_GETFL, 0);
+    fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
+
+    while( my $n = sysread( $socket, my $buf, 8192 ) ) {
+        $logger->debug("flush_socket dropped $n bytes of data");
+        $logger->error("flush_socket dropped data on disconnected socket: $buf")
+            unless($socket->connected);
+    }
+
+    fcntl($socket, F_SETFL, $flags);
+    return 0 unless $socket->connected;
+    return 1;
+}
+
+
+
+
+
+1;
+
+
+
+
+

Modified: trunk/src/perlmods/OpenSRF/Transport.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -81,16 +81,13 @@
 
 	$logger->transport( "Transport handler() received $data", INTERNAL );
 
-	# pass data to the message envelope 
-	my $helper = OpenSRF::Transport::SlimJabber::MessageWrapper->new( $data );
+	my $remote_id	= $data->from;
+	my $sess_id	= $data->thread;
+	my $body	= $data->body;
+	my $type	= $data->type;
 
-	# Extract message information
-	my $remote_id	= $helper->get_remote_id();
-	my $sess_id	= $helper->get_sess_id();
-	my $body	= $helper->get_body();
-	my $type	= $helper->get_msg_type();
+	$logger->set_osrf_xid($data->osrf_xid);
 
-	$logger->set_osrf_xid($helper->get_osrf_xid);
 
 	if (defined($type) and $type eq 'error') {
 		throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
@@ -129,8 +126,8 @@
 	eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); };
 	if( $@ ) {
 
-		$logger->transport( "Received bogus JSON: $@", INFO );
-		$logger->transport( "Bogus JSON data: \n $body \n", INTERNAL );
+		$logger->warn("Received bogus JSON: $@");
+		$logger->warn("Bogus JSON data: $body");
 		my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" );
 
 		$app_session->status($res);

Modified: trunk/src/perlmods/OpenSRF/UnixServer.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/UnixServer.pm	2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/UnixServer.pm	2008-03-31 20:47:41 UTC (rev 1296)
@@ -14,6 +14,7 @@
 use vars qw/@ISA $app/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Carp;
+use FreezeThaw qw/thaw/;
 
 use IO::Socket::INET;
 use IO::Socket::UNIX;
@@ -97,6 +98,7 @@
 		exit;
 	}
 
+    ($data) = thaw($data);
 	my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
 
 	if(!ref($app_session)) {



More information about the opensrf-commits mailing list