[Opensrf-commits] r1296 - in trunk/src/perlmods/OpenSRF: .
Transport/SlimJabber
svn at svn.open-ils.org
svn at svn.open-ils.org
Mon Mar 31 17:23:45 EDT 2008
Author: erickson
Date: 2008-03-31 16:47:41 -0400 (Mon, 31 Mar 2008)
New Revision: 1296
Added:
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm
Modified:
trunk/src/perlmods/OpenSRF/Transport.pm
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm
trunk/src/perlmods/OpenSRF/UnixServer.pm
Log:
This patch replaces the regex-based XML stream parsing mechanism with an XML::Parser (expat) based parser.
The API and parsing behavior should behave identically
This requires a new OpenSRF Perl dependency -> FreezeThaw
Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -1,595 +1,126 @@
package OpenSRF::Transport::SlimJabber::Client;
use strict; use warnings;
use OpenSRF::EX;
-use base qw( OpenSRF );
-use OpenSRF::Utils::Logger qw(:level);
use OpenSRF::Utils::Config;
-use Time::HiRes qw(ualarm);
-use OpenSRF::Utils::Config;
-
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use IO::Socket::INET;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::SlimJabber::XMPPReader;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
use IO::Socket::UNIX;
+use FreezeThaw qw/freeze/;
-=head1 Description
-
-OpenSRF::Transport::SlimJabber::Client
-
-Home-brewed slimmed down jabber connection agent. Supports SSL connections
-with a config file options:
-
- transport->server->sslport # the ssl port
- transport->server->ssl # is this ssl?
-
-=cut
-
-my $logger = "OpenSRF::Utils::Logger";
-
sub DESTROY{
- my $self = shift;
- $self->disconnect;
+ shift()->disconnect;
}
-sub disconnect{
- my $self = shift;
- my $socket = $self->{_socket};
- if( $socket and $socket->connected() ) {
- print $socket "</stream:stream>";
- close( $socket );
- }
-}
-
-
-=head2 new()
-
-Creates a new Client object.
-
-debug and log_file are not required if you don't care to log the activity,
-however all other parameters are.
-
-%params:
-
- username
- resource
- password
- debug
- log_file
-
-=cut
-
sub new {
-
my( $class, %params ) = @_;
-
- $class = ref( $class ) || $class;
-
- my $port = $params{'port'} || return undef;
- my $username = $params{'username'} || return undef;
- my $resource = $params{'resource'} || return undef;
- my $password = $params{'password'} || return undef;
- my $host = $params{'host'} || return undef;
-
- my $jid = "$username\@$host\/$resource";
-
- my $self = bless {} => $class;
-
- $self->jid( $jid );
- $self->host( $host );
- $self->port( $port );
- $self->username( $username );
- $self->resource( $resource );
- $self->password( $password );
- $self->{temp_buffer} = "";
-
- $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
- $logger->INFO );
-
+ my $self = bless({}, ref($class) || $class);
+ $self->params(\%params);
return $self;
}
-# clears the tmp buffer as well as the TCP buffer
-sub buffer_reset {
- my $self = shift;
- $self->{temp_buffer} = "";
-
- my $fh = $self->{_socket};
- set_nonblock( $fh );
- my $t_buf = "";
- while( sysread( $fh, $t_buf, 4096 ) ) {}
- set_block( $fh );
+sub reader {
+ my($self, $reader) = @_;
+ $self->{reader} = $reader if $reader;
+ return $self->{reader};
}
-# -------------------------------------------------
-=head2 gather()
-
-Gathers all Jabber messages sitting in the collection queue
-and hands them each to their respective callbacks. This call
-does not block (calls Process(0))
-
-=cut
-
-sub gather { my $self = shift; $self->process( 0 ); }
-
-# -------------------------------------------------
-
-=head2 listen()
-
-Blocks and gathers incoming messages as they arrive. Does not return
-unless an error occurs.
-
-Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
-
-=cut
-sub listen {
- my $self = shift;
-
- my $sock = $self->unix_sock();
- my $socket = IO::Socket::UNIX->new( Peer => $sock );
- $logger->transport( "Unix Socket opened by Listener", INTERNAL );
-
- throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
- unless ($socket->connected);
-
- while(1) {
- my $o = $self->process( -1 );
- $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
- if( ! defined( $o ) ) {
- throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
- }
- print $socket $o;
-
- }
- throw OpenSRF::EX::Socket( "How did we get here?!?!" );
+sub params {
+ my($self, $params) = @_;
+ $self->{params} = $params if $params;
+ return $self->{params};
}
-sub set_nonblock {
- my $fh = shift;
- my $flags = fcntl($fh, F_GETFL, 0)
- or die "Can't get flags for the socket: $!\n";
-
- $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
-
- fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
- or die "Can't set flags for the socket: $!\n";
-
- return $flags;
+sub socket {
+ my($self, $socket) = @_;
+ $self->{socket} = $socket if $socket;
+ return $self->{socket};
}
-sub reset_fl {
- my $fh = shift;
- my $flags = shift;
- $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
- fcntl($fh, F_SETFL, $flags) if defined $flags;
+sub disconnect {
+ my $self = shift;
+ $self->reader->disconnect if $self->reader;
}
-sub set_block {
- my $fh = shift;
- my $flags = fcntl($fh, F_GETFL, 0)
- or die "Can't get flags for the socket: $!\n";
-
- $flags &= ~O_NONBLOCK;
-
- fcntl($fh, F_SETFL, $flags)
- or die "Can't set flags for the socket: $!\n";
+sub gather {
+ my $self = shift;
+ $self->process( 0 );
}
-
-sub timed_read {
- my ($self, $timeout) = @_;
- $timeout = defined($timeout) ? int($timeout) : undef;
-
- $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
- if( $self->can( "app" ) ) {
- $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
- }
-
- # See if there is a complete message in the temp_buffer
- # that we can return
- if( $self->{temp_buffer} ) {
- my $buffer = $self->{temp_buffer};
- my $complete = 0;
- $self->{temp_buffer} = '';
-
- my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
- $self->{last_tag} = $tag;
- $logger->transport("Using tag: $tag ", INTERNAL);
-
- if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
- $buffer = $1;
- $self->{temp_buffer} = $2;
- $complete++;
- $logger->transport( "completed read with $buffer", INTERNAL );
- } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
- $self->{temp_buffer} = $1;
- $complete++;
- $logger->transport( "completed read with $buffer", INTERNAL );
- } else {
- $self->{temp_buffer} = $buffer;
- }
-
- if( $buffer and $complete ) {
- return $buffer;
- }
-
- }
- ############
-
- my $fh = $self->{_socket};
-
- unless( $fh and $fh->connected ) {
- throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
- }
-
- $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
-
- my $flags;
- if (defined($timeout) && !$timeout) {
- $flags = set_nonblock( $fh );
- }
-
- $timeout ||= 0;
- $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
-
-
- my $complete = 0;
- my $first_read = 1;
- my $xml = '';
- eval {
- my $tag = '';
- eval {
- no warnings;
- local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
-
- # alarm needs a number greater => 1.
- my $alarm_timeout = $timeout;
- if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
- $alarm_timeout = 1;
- }
- alarm $alarm_timeout;
- do {
-
- my $buffer = $self->{temp_buffer};
- $self->{temp_buffer} = '';
- #####
-
- # This code is no longer in use
- #my $ff = fcntl($fh, F_GETFL, 0);
- #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
- #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
- #}
-
- my $t_buf = "";
- my $read_size = 1024; my $f = 0;
- while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
-
- unless( $fh->connected ) {
- OpenSRF::EX::JabberDisconnected->throw(
- "Lost jabber client in timed_read()");
- }
-
- # XXX Change me to debug/internal at some point, this is for testing...
- # XXX Found a race condition where reading >= $read_size bytes of data
- # will fail if the log line below is removed.
- $logger->info("timed_read() read $n bytes of data");
-
-
- $buffer .= $t_buf;
- if( $n < $read_size ) {
- #reset_fl( $fh, $f ) if $f;
- set_block( $fh );
- last;
- }
- # see if there is any more data to grab...
- $f = set_nonblock( $fh );
- }
-
- #sysread($fh, $buffer, 2048, length($buffer) );
- #sysread( $fh, $t_buf, 2048 );
- #$buffer .= $t_buf;
-
- #####
- $logger->transport(" Got [$buffer] from the socket", INTERNAL);
-
- if ($first_read) {
- $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
- ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
- $self->{last_tag} = $tag;
- $first_read--;
- $logger->transport("Using tag: $tag ", INTERNAL);
- }
-
- if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
- $buffer = $1;
- $self->{temp_buffer} = $2;
- $complete++;
- $logger->transport( "completed read with $buffer", INTERNAL );
- } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
- $self->{temp_buffer} = $1;
- $complete++;
- $logger->transport( "completed read with $buffer", INTERNAL );
- }
-
- $xml .= $buffer;
-
- } while (!$complete && $xml);
- alarm(0);
- };
- alarm(0);
- };
-
- $logger->transport( "XML Read: $xml", INTERNAL );
- #reset_fl( $fh, $flags) if defined $flags;
- set_block( $fh ) if defined $flags;
-
- if ($complete) {
- return $xml;
- }
- if( $@ ) {
- return undef;
- }
- return "";
-}
-
-
# -------------------------------------------------
sub tcp_connected {
-
my $self = shift;
- return 1 if ($self->{_socket} and $self->{_socket}->connected);
- return 0;
+ return $self->reader->tcp_connected if $self->reader;
+ return 0;
}
-sub password {
- my( $self, $password ) = @_;
- $self->{'oils:password'} = $password if $password;
- return $self->{'oils:password'};
-}
-# -------------------------------------------------
-sub username {
- my( $self, $username ) = @_;
- $self->{'oils:username'} = $username if $username;
- return $self->{'oils:username'};
-}
-
-# -------------------------------------------------
-
-sub resource {
- my( $self, $resource ) = @_;
- $self->{'oils:resource'} = $resource if $resource;
- return $self->{'oils:resource'};
-}
-
-# -------------------------------------------------
-
-sub jid {
- my( $self, $jid ) = @_;
- $self->{'oils:jid'} = $jid if $jid;
- return $self->{'oils:jid'};
-}
-
-sub port {
- my( $self, $port ) = @_;
- $self->{'oils:port'} = $port if $port;
- return $self->{'oils:port'};
-}
-
-sub host {
- my( $self, $host ) = @_;
- $self->{'oils:host'} = $host if $host;
- return $self->{'oils:host'};
-}
-
-# -------------------------------------------------
-
-=head2 send()
-
- Sends a Jabber message.
-
- %params:
- to - The JID of the recipient
- thread - The Jabber thread
- body - The body of the message
-
-=cut
-
sub send {
my $self = shift;
- my %params = @_;
-
- my $to = $params{'to'} || return undef;
- my $body = $params{'body'} || return undef;
- my $thread = $params{'thread'} || "";
- my $router_command = $params{'router_command'} || "";
- my $router_class = $params{'router_class'} || "";
-
- my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
-
- $msg->setTo( $to );
- $msg->setThread( $thread ) if $thread;
- $msg->setBody( $body );
- $msg->set_router_command( $router_command );
- $msg->set_router_class( $router_class );
- $msg->set_osrf_xid($logger->get_osrf_xid);
-
- $logger->transport(
- "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
-
- my $soc = $self->{_socket};
- unless( $soc and $soc->connected ) {
- throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
- }
- print $soc $msg->toString;
-
- $logger->transport(
- "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
+ my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_);
+ $self->reader->send($msg->to_xml);
}
-
-=head2 inintialize()
-
-Connect to the server and log in.
-
-Throws an OpenSRF::EX::JabberException if we cannot connect
-to the server or if the authentication fails.
-
-=cut
-
-# --- The logging lines have been commented out until we decide
-# on which log files we're using.
-
sub initialize {
my $self = shift;
- my $jid = $self->jid;
- my $host = $self->host;
- my $port = $self->port;
- my $username = $self->username;
- my $resource = $self->resource;
- my $password = $self->password;
+ my $host = $self->params->{host};
+ my $port = $self->params->{port};
+ my $username = $self->params->{username};
+ my $resource = $self->params->{resource};
+ my $password = $self->params->{password};
- my $stream = <<" XML";
-<stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
- XML
+ my $jid = "$username\@$host/$resource";
my $conf = OpenSRF::Utils::Config->current;
+
my $tail = "_$$";
- if(!$conf->bootstrap->router_name && $username eq "router") {
- $tail = "";
- }
+ $tail = "" if !$conf->bootstrap->router_name and $username eq "router";
+ $resource = "$resource$tail";
- my $auth = <<" XML";
-<iq id='123' type='set'>
-<query xmlns='jabber:iq:auth'>
-<username>$username</username>
-<password>$password</password>
-<resource>${resource}$tail</resource>
-</query>
-</iq>
- XML
+ my $socket = IO::Socket::INET->new(
+ PeerHost => $host,
+ PeerPort => $port,
+ Peer => $port,
+ Proto => 'tcp' );
- my $sock_type = 'IO::Socket::INET';
-
- # if port is a string, then we're connecting to a UNIX socket
- unless( $port =~ /^\d+$/ ) {
- $sock_type = 'IO::Socket::UNIX';
- }
+ throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $!")
+ unless ( $socket and $socket->connected );
- # --- 5 tries to connect to the jabber server
- my $socket;
- for(1..5) {
- $socket = $sock_type->new( PeerHost => $host,
- PeerPort => $port,
- Peer => $port,
- Proto => 'tcp' );
- $logger->debug( "$jid: $_ connect attempt to $host:$port");
- last if ( $socket and $socket->connected );
- $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
- sleep 3;
- }
+ $self->socket($socket);
+ $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket));
+ $self->reader->connect($host, $username, $password, $resource);
- unless ( $socket and $socket->connected ) {
- throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
- }
+ throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $!")
+ unless ( $self->reader->connected );
- $logger->transport( "Logging into jabber as $jid " .
- "from " . ref( $self ), DEBUG );
-
- print $socket $stream;
-
- my $buffer;
- eval {
- eval {
- local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
- alarm 3;
- sysread($socket, $buffer, 4096);
- $logger->transport( "Login buffer 1: $buffer", INTERNAL );
- alarm(0);
- };
- alarm(0);
- };
-
- print $socket $auth;
-
- if( $socket and $socket->connected() ) {
- $self->{_socket} = $socket;
- } else {
- throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
- }
-
-
- $buffer = $self->timed_read(10);
-
- if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
-
- if( $buffer and $buffer =~ /type=["\']result["\']/ ) {
- $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
- } else {
- if( !$buffer ) { $buffer = " "; }
- $socket->close;
- throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
- }
-
return $self;
}
+
sub construct {
my( $class, $app ) = @_;
- $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
- $class->peer_handle(
- $class->new( $app )->initialize() );
+ $class->peer_handle($class->new( $app )->initialize());
}
+
sub process {
+ my($self, $timeout) = @_;
- my( $self, $timeout ) = @_;
-
$timeout ||= 0;
$timeout = int($timeout);
- undef $timeout if ( $timeout < 0 );
- unless( $self->{_socket}->connected ) {
- OpenSRF::EX::JabberDisconnected->throw(
- "This JabberClient instance is no longer connected to the server " .
- $self->username . " : " . $self->resource, ERROR );
+ unless( $self->reader and $self->reader->connected ) {
+ throw OpenSRF::EX::JabberDisconnected
+ ("This JabberClient instance is no longer connected to the server ");
}
- my $val = $self->timed_read( $timeout );
-
- $timeout = "FOREVER" unless ( defined $timeout );
-
- if ( ! defined( $val ) ) {
- OpenSRF::EX::Jabber->throw(
- "Call to Client->timed_read( $timeout ) failed", ERROR );
- } elsif ( ! $val ) {
- $logger->transport(
- "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
- } elsif ( $val ) {
- $logger->transport(
- "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
- }
-
- my $t = $self->{last_tag};
-
- if( $t and $val ) {
- my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
- $val = shift(@msgs);
-
- if (@msgs) {
- my $tmp = $self->{temp_buffer};
-
- $self->{temp_buffer} = '';
- $self->{temp_buffer} .= $_ for (@msgs);
- $self->{temp_buffer} .= $tmp;
- }
- }
-
- return $val;
+ return $self->reader->wait_msg($timeout);
}
@@ -599,34 +130,10 @@
# Returns 1 on success, 0 if the socket isn't connected
# --------------------------------------------------------------
sub flush_socket {
-
my $self = shift;
- my $socket = $self->{_socket};
-
- if( $socket ) {
-
- my $buf;
- my $flags = fcntl($socket, F_GETFL, 0);
-
- fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
- while( my $n = sysread( $socket, $buf, 8192 ) ) {
- $logger->debug("flush_socket dropped $n bytes of data");
- if(!$socket->connected()) {
- $logger->error("flush_socket dropped data on disconnected socket: $buf");
- }
- }
- fcntl($socket, F_SETFL, $flags);
-
- return 0 unless $socket->connected();
-
- return 1;
-
- } else {
-
- return 0;
- }
+ return $self->reader->flush_socket;
}
+1;
-1;
Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -6,6 +6,7 @@
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Config;
use Time::HiRes qw/usleep/;
+use FreezeThaw qw/freeze/;
my $logger = "OpenSRF::Utils::Logger";
@@ -20,9 +21,6 @@
=cut
-# XXX This will be overhauled to connect as a component instead of as
-# a user. all in good time, though.
-
{
my $unix_sock;
sub unix_sock { return $unix_sock; }
@@ -98,7 +96,6 @@
$logger->info("loading router info $routers");
for my $router (@$routers) {
-
if(ref $router) {
if( !$router->{services} || grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) {
my $name = $router->{name};
@@ -134,10 +131,8 @@
$logger->debug("Inbound listener calling process()");
try {
- $o = $self->process( -1 );
+ $o = $self->process(-1);
- $logger->debug("Inbound listener received ".length($o)." bytes of data");
-
if(!$o){
$logger->error(
"Inbound received no data from the Jabber socket in process()");
@@ -158,7 +153,7 @@
throw OpenSRF::EX::Socket(
"Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
unless ($socket->connected);
- print $socket $o;
+ print $socket freeze($o);
$socket->close;
}
}
Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -1,125 +1,72 @@
package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use XML::LibXML;
-use OpenSRF::EX qw/:try/;
-use OpenSRF::Utils::Logger qw/$logger/;
+use strict; use warnings;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+# ----------------------------------------------------------
+# Legacy wrapper for XMPPMessage
+# ----------------------------------------------------------
+
sub new {
my $class = shift;
- $class = ref($class) || $class;
+ my $msg = shift;
+ return bless({msg => $msg}, ref($class) || $class);
+}
- my $xml = shift;
-
- my ($doc, $msg);
- if ($xml) {
- my $err;
-
- try {
- $doc = XML::LibXML->new->parse_string($xml);
- } catch Error with {
- $err = shift;
- warn "MessageWrapper received bad XML : error = $err\nXML = $xml\n";
- $logger->error("MessageWrapper received bad XML : error = $err : XML = $xml");
- };
- throw $err if $err;
-
- $msg = $doc->documentElement;
- } else {
- $doc = XML::LibXML::Document->createDocument;
- $msg = $doc->createElement( 'message' );
- $doc->setDocumentElement( $msg );
- }
-
-
- my $self = { msg_node => $msg };
-
- return bless $self => $class;
+sub msg {
+ my($self, $msg) = @_;
+ $self->{msg} = $msg if $msg;
+ return $self->{msg};
}
sub toString {
- my $self = shift;
- if( $self->{msg_node} ) {
- return $self->{msg_node}->toString(@_);
- }
- return "";
+ return $_[0]->msg->to_xml;
}
sub get_body {
- my $self = shift;
- my ($t_body) = grep {$_->nodeName eq 'body'} $self->{msg_node}->childNodes;
- if( $t_body ) {
- my $body = $t_body->textContent;
- return $body;
- }
- return "";
+ return $_[0]->msg->body;
}
sub get_sess_id {
- my $self = shift;
- my ($t_node) = grep {$_->nodeName eq 'thread'} $self->{msg_node}->childNodes;
- if( $t_node ) {
- return $t_node->textContent;
- }
- return "";
+ return $_[0]->msg->thread;
}
sub get_msg_type {
- my $self = shift;
- $self->{msg_node}->getAttribute( 'type' );
+ return $_[0]->msg->type;
}
sub get_remote_id {
- my $self = shift;
-
- #
- my $rid = $self->{msg_node}->getAttribute( 'router_from' );
- return $rid if $rid;
-
- return $self->{msg_node}->getAttribute( 'from' );
+ return $_[0]->msg->from;
}
sub setType {
- my $self = shift;
- $self->{msg_node}->setAttribute( type => shift );
+ $_[0]->msg->type(shift());
}
sub setTo {
- my $self = shift;
- $self->{msg_node}->setAttribute( to => shift );
+ $_[0]->msg->to(shift());
}
sub setThread {
- my $self = shift;
- $self->{msg_node}->appendTextChild( thread => shift );
+ $_[0]->msg->thread(shift());
}
sub setBody {
- my $self = shift;
- my $body = shift;
- $self->{msg_node}->appendTextChild( body => $body );
+ $_[0]->msg->body(shift());
}
sub set_router_command {
- my( $self, $router_command ) = @_;
- if( $router_command ) {
- $self->{msg_node}->setAttribute( router_command => $router_command );
- }
+ $_[0]->msg->router_command(shift());
}
sub set_router_class {
- my( $self, $router_class ) = @_;
- if( $router_class ) {
- $self->{msg_node}->setAttribute( router_class => $router_class );
- }
+ $_[0]->msg->router_class(shift());
}
sub set_osrf_xid {
- my( $self, $xid ) = @_;
- $self->{msg_node}->setAttribute( osrf_xid => $xid );
+ $_[0]->msg->osrf_xid(shift());
}
-
sub get_osrf_xid {
- my $self = shift;
- $self->{msg_node}->getAttribute('osrf_xid');
+ return $_[0]->msg->osrf_xid;
}
1;
Modified: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -29,15 +29,9 @@
sub retrieve {
my( $class, $app ) = @_;
return $_singleton_connection;
-# my @keys = keys %apps_hash;
-# OpenSRF::Utils::Logger->transport(
-# "Requesting peer for $app and we have @keys", INFO );
-# return $apps_hash{$app};
}
-
-# !! In here we use the bootstrap config ....
sub new {
my( $class, $app ) = @_;
@@ -63,12 +57,6 @@
if( $app eq "client" ) { $resource = "client_at_$h"; }
-# unless ( $conf->bootstrap->router_name ) {
-# $username = 'router';
-# $resource = $app;
-# }
-
-
OpenSRF::EX::Config->throw( "JPeer could not load all necesarry values from config" )
unless ( $username and $password and $resource and $host and $port );
@@ -94,26 +82,16 @@
}
sub process {
-
my $self = shift;
my $val = $self->SUPER::process(@_);
return 0 unless $val;
-
- OpenSRF::Utils::Logger->transport( "Calling transport handler for ".$self->app." with: $val", INTERNAL );
- my $t;
- $t = OpenSRF::Transport->handler($self->app, $val);
-
- return $t;
+ return OpenSRF::Transport->handler($self->app, $val);
}
sub app {
my $self = shift;
my $app = shift;
- if( $app ) {
- OpenSRF::Utils::Logger->transport( "PEER changing app to $app: ".$self->jid, INTERNAL );
- }
-
- $self->{app} = $app if ($app);
+ $self->{app} = $app if $app;
return $self->{app};
}
Added: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm (rev 0)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -0,0 +1,134 @@
+package OpenSRF::Transport::SlimJabber::XMPPMessage;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+use XML::LibXML;
+
+use constant JABBER_MESSAGE =>
+ "<message to='%s' from='%s' router_command='%s' router_class='%s' osrf_xid='%s'>".
+ "<thread>%s</thread><body>%s</body></message>";
+
+sub new {
+ my $class = shift;
+ my %args = @_;
+ my $self = bless({}, $class);
+
+ if($args{xml}) {
+ $self->parse_xml($args{xml});
+
+ } else {
+ $self->{to} = $args{to} || '';
+ $self->{from} = $args{from} || '';
+ $self->{thread} = $args{thread} || '';
+ $self->{body} = $args{body} || '';
+ $self->{osrf_xid} = $args{osrf_xid} || '';
+ $self->{router_command} = $args{router_command} || '';
+ $self->{router_class} = $args{router_class} || '';
+ }
+
+ return $self;
+}
+
+sub to {
+ my($self, $to) = @_;
+ $self->{to} = $to if defined $to;
+ return $self->{to};
+}
+sub from {
+ my($self, $from) = @_;
+ $self->{from} = $from if defined $from;
+ return $self->{from};
+}
+sub thread {
+ my($self, $thread) = @_;
+ $self->{thread} = $thread if defined $thread;
+ return $self->{thread};
+}
+sub body {
+ my($self, $body) = @_;
+ $self->{body} = $body if defined $body;
+ return $self->{body};
+}
+sub status {
+ my($self, $status) = @_;
+ $self->{status} = $status if defined $status;
+ return $self->{status};
+}
+sub type {
+ my($self, $type) = @_;
+ $self->{type} = $type if defined $type;
+ return $self->{type};
+}
+sub err_type {
+ my($self, $err_type) = @_;
+ $self->{err_type} = $err_type if defined $err_type;
+ return $self->{err_type};
+}
+sub err_code {
+ my($self, $err_code) = @_;
+ $self->{err_code} = $err_code if defined $err_code;
+ return $self->{err_code};
+}
+sub osrf_xid {
+ my($self, $osrf_xid) = @_;
+ $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+ return $self->{osrf_xid};
+}
+sub router_command {
+ my($self, $router_command) = @_;
+ $self->{router_command} = $router_command if defined $router_command;
+ return $self->{router_command};
+}
+sub router_class {
+ my($self, $router_class) = @_;
+ $self->{router_class} = $router_class if defined $router_class;
+ return $self->{router_class};
+}
+
+
+sub to_xml {
+ my $self = shift;
+
+ my $body = $self->{body};
+ $body =~ s/&/&/sog;
+ $body =~ s/</</sog;
+ $body =~ s/>/>/sog;
+
+ return sprintf(
+ JABBER_MESSAGE,
+ $self->{to},
+ $self->{from},
+ $self->{router_command},
+ $self->{router_class},
+ $self->{osrf_xid},
+ $self->{thread},
+ $body
+ );
+}
+
+sub parse_xml {
+ my($self, $xml) = @_;
+ my($doc, $err);
+
+ try {
+ $doc = XML::LibXML->new->parse_string($xml);
+ } catch Error with {
+ my $err = shift;
+ $logger->error("Error parsing message xml: $xml --- $err");
+ };
+ throw $err if $err;
+
+ my $root = $doc->documentElement;
+
+ $self->{body} = $root->findnodes('/message/body').'';
+ $self->{thread} = $root->findnodes('/message/thread').'';
+ $self->{from} = $root->getAttribute('router_from');
+ $self->{from} = $root->getAttribute('from') unless $self->{from};
+ $self->{to} = $root->getAttribute('to');
+ $self->{type} = $root->getAttribute('type');
+ $self->{osrf_xid} = $root->getAttribute('osrf_xid');
+}
+
+
+1;
Added: trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm (rev 0)
+++ trunk/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -0,0 +1,350 @@
+package OpenSRF::Transport::SlimJabber::XMPPReader;
+use strict; use warnings;
+use XML::Parser;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw/time/;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Utils::Logger qw/$logger/;
+
+# -----------------------------------------------------------
+# Connect, disconnect, and authentication messsage templates
+# -----------------------------------------------------------
+use constant JABBER_CONNECT =>
+ "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
+
+use constant JABBER_BASIC_AUTH =>
+ "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
+ "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
+
+use constant JABBER_DISCONNECT => "</stream:stream>";
+
+
+# -----------------------------------------------------------
+# XMPP Stream states
+# -----------------------------------------------------------
+use constant DISCONNECTED => 1;
+use constant CONNECT_RECV => 2;
+use constant CONNECTED => 3;
+
+
+# -----------------------------------------------------------
+# XMPP Message states
+# -----------------------------------------------------------
+use constant IN_NOTHING => 1;
+use constant IN_BODY => 2;
+use constant IN_THREAD => 3;
+use constant IN_STATUS => 4;
+
+
+# -----------------------------------------------------------
+# Constructor, getter/setters
+# -----------------------------------------------------------
+sub new {
+ my $class = shift;
+ my $socket = shift;
+
+ my $self = bless({}, $class);
+
+ $self->{queue} = [];
+ $self->{stream_state} = DISCONNECTED;
+ $self->{xml_state} = IN_NOTHING;
+ $self->socket($socket);
+
+ my $p = new XML::Parser(Handlers => {
+ Start => \&start_element,
+ End => \&end_element,
+ Char => \&characters,
+ });
+
+ $self->parser($p->parse_start); # create a push parser
+ $self->parser->{_parent_} = $self;
+ $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+ return $self;
+}
+
+sub push_msg {
+ my($self, $msg) = @_;
+ push(@{$self->{queue}}, $msg) if $msg;
+}
+
+sub next_msg {
+ my $self = shift;
+ return shift @{$self->{queue}};
+}
+
+sub peek_msg {
+ my $self = shift;
+ return (@{$self->{queue}} > 0);
+}
+
+sub parser {
+ my($self, $parser) = @_;
+ $self->{parser} = $parser if $parser;
+ return $self->{parser};
+}
+
+sub socket {
+ my($self, $socket) = @_;
+ $self->{socket} = $socket if $socket;
+ return $self->{socket};
+}
+
+sub stream_state {
+ my($self, $stream_state) = @_;
+ $self->{stream_state} = $stream_state if $stream_state;
+ return $self->{stream_state};
+}
+
+sub xml_state {
+ my($self, $xml_state) = @_;
+ $self->{xml_state} = $xml_state if $xml_state;
+ return $self->{xml_state};
+}
+
+sub message {
+ my($self, $message) = @_;
+ $self->{message} = $message if $message;
+ return $self->{message};
+}
+
+
+# -----------------------------------------------------------
+# Stream and connection handling methods
+# -----------------------------------------------------------
+
+sub connect {
+ my($self, $domain, $username, $password, $resource) = @_;
+
+ $self->send(sprintf(JABBER_CONNECT, $domain));
+ $self->wait(10);
+
+ unless($self->{stream_state} == CONNECT_RECV) {
+ $logger->error("No initial XMPP response from server");
+ return 0;
+ }
+
+ $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
+ $self->wait(10);
+
+ unless($self->connected) {
+ $logger->error('XMPP connect failed');
+ return 0;
+ }
+
+ return 1;
+}
+
+sub disconnect {
+ my $self = shift;
+ $self->send(JABBER_DISCONNECT);
+ shutdown($self->socket, 2);
+ close($self->socket);
+}
+
+# -----------------------------------------------------------
+# returns true if this stream is connected to the server
+# -----------------------------------------------------------
+sub connected {
+ my $self = shift;
+ return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
+}
+
+# -----------------------------------------------------------
+# returns true if the socket is connected
+# -----------------------------------------------------------
+sub tcp_connected {
+ my $self = shift;
+ return ($self->socket and $self->socket->connected);
+}
+
+# -----------------------------------------------------------
+# sends pre-formated XML
+# -----------------------------------------------------------
+sub send {
+ my($self, $xml) = @_;
+ $self->{socket}->print($xml);
+}
+
+# -----------------------------------------------------------
+# Puts a file handle into blocking mode
+# -----------------------------------------------------------
+sub set_block {
+ my $fh = shift;
+ my $flags = fcntl($fh, F_GETFL, 0);
+ $flags &= ~O_NONBLOCK;
+ fcntl($fh, F_SETFL, $flags);
+}
+
+
+# -----------------------------------------------------------
+# Puts a file handle into non-blocking mode
+# -----------------------------------------------------------
+sub set_nonblock {
+ my $fh = shift;
+ my $flags = fcntl($fh, F_GETFL, 0);
+ fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+
+sub wait {
+ my($self, $timeout) = @_;
+
+ return $self->next_msg if $self->peek_msg;
+
+ $timeout ||= 0;
+ $timeout = undef if $timeout < 0;
+ my $socket = $self->{socket};
+
+ set_block($socket);
+
+ # build the select readset
+ my $infile = '';
+ vec($infile, $socket->fileno, 1) = 1;
+ return undef unless select($infile, undef, undef, $timeout);
+
+ # now slurp the data off the socket
+ my $buf;
+ my $read_size = 1024;
+ while(my $n = sysread($socket, $buf, $read_size)) {
+ $self->{parser}->parse_more($buf) if $buf;
+ if($n < $read_size or $self->peek_msg) {
+ set_block($socket);
+ last;
+ }
+ set_nonblock($socket);
+ }
+
+ return $self->next_msg;
+}
+
+# -----------------------------------------------------------
+# Waits up to timeout seconds for a fully-formed XMPP
+# message to arrive. If timeout is < 0, waits indefinitely
+# -----------------------------------------------------------
+sub wait_msg {
+ my($self, $timeout) = @_;
+ my $xml;
+
+ $timeout = 0 unless defined $timeout;
+
+ if($timeout < 0) {
+ while(1) {
+ return $xml if $xml = $self->wait($timeout);
+ }
+
+ } else {
+ while($timeout >= 0) {
+ my $start = time;
+ return $xml if $xml = $self->wait($timeout);
+ $timeout -= time - $start;
+ }
+ }
+
+ return undef;
+}
+
+
+# -----------------------------------------------------------
+# SAX Handlers
+# -----------------------------------------------------------
+
+
+sub start_element {
+ my($parser, $name, %attrs) = @_;
+ my $self = $parser->{_parent_};
+
+ if($name eq 'message') {
+
+ my $msg = $self->{message};
+ $msg->{to} = $attrs{'to'};
+ $msg->{from} = $attrs{router_from} if $attrs{router_from};
+ $msg->{from} = $attrs{from} unless $msg->{from};
+ $msg->{osrf_xid} = $attrs{'osrf_xid'};
+ $msg->{type} = $attrs{type};
+
+ } elsif($name eq 'body') {
+ $self->{xml_state} = IN_BODY;
+
+ } elsif($name eq 'thread') {
+ $self->{xml_state} = IN_THREAD;
+
+ } elsif($name eq 'stream:stream') {
+ $self->{stream_state} = CONNECT_RECV;
+
+ } elsif($name eq 'iq') {
+ if($attrs{type} and $attrs{type} eq 'result') {
+ $self->{stream_state} = CONNECTED;
+ }
+
+ } elsif($name eq 'status') {
+ $self->{xml_state } = IN_STATUS;
+
+ } elsif($name eq 'stream:error') {
+ $self->{stream_state} = DISCONNECTED;
+
+ } elsif($name eq 'error') {
+ $self->{message}->{err_type} = $attrs{'type'};
+ $self->{message}->{err_code} = $attrs{'code'};
+ $self->{stream_state} = DISCONNECTED;
+ }
+}
+
+sub characters {
+ my($parser, $chars) = @_;
+ my $self = $parser->{_parent_};
+ my $state = $self->{xml_state};
+
+ if($state == IN_BODY) {
+ $self->{message}->{body} .= $chars;
+
+ } elsif($state == IN_THREAD) {
+ $self->{message}->{thread} .= $chars;
+
+ } elsif($state == IN_STATUS) {
+ $self->{message}->{status} .= $chars;
+ }
+}
+
+sub end_element {
+ my($parser, $name) = @_;
+ my $self = $parser->{_parent_};
+ $self->{xml_state} = IN_NOTHING;
+
+ if($name eq 'message') {
+ $self->push_msg($self->{message});
+ $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+
+ } elsif($name eq 'stream:stream') {
+ $self->{stream_state} = DISCONNECTED;
+ }
+}
+
+sub flush_socket {
+ my $self = shift;
+ my $socket = $self->socket;
+ return 0 unless $socket and $socket->connected;
+
+ my $flags = fcntl($socket, F_GETFL, 0);
+ fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
+
+ while( my $n = sysread( $socket, my $buf, 8192 ) ) {
+ $logger->debug("flush_socket dropped $n bytes of data");
+ $logger->error("flush_socket dropped data on disconnected socket: $buf")
+ unless($socket->connected);
+ }
+
+ fcntl($socket, F_SETFL, $flags);
+ return 0 unless $socket->connected;
+ return 1;
+}
+
+
+
+
+
+1;
+
+
+
+
+
Modified: trunk/src/perlmods/OpenSRF/Transport.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/Transport.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/Transport.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -81,16 +81,13 @@
$logger->transport( "Transport handler() received $data", INTERNAL );
- # pass data to the message envelope
- my $helper = OpenSRF::Transport::SlimJabber::MessageWrapper->new( $data );
+ my $remote_id = $data->from;
+ my $sess_id = $data->thread;
+ my $body = $data->body;
+ my $type = $data->type;
- # Extract message information
- my $remote_id = $helper->get_remote_id();
- my $sess_id = $helper->get_sess_id();
- my $body = $helper->get_body();
- my $type = $helper->get_msg_type();
+ $logger->set_osrf_xid($data->osrf_xid);
- $logger->set_osrf_xid($helper->get_osrf_xid);
if (defined($type) and $type eq 'error') {
throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
@@ -129,8 +126,8 @@
eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); };
if( $@ ) {
- $logger->transport( "Received bogus JSON: $@", INFO );
- $logger->transport( "Bogus JSON data: \n $body \n", INTERNAL );
+ $logger->warn("Received bogus JSON: $@");
+ $logger->warn("Bogus JSON data: $body");
my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" );
$app_session->status($res);
Modified: trunk/src/perlmods/OpenSRF/UnixServer.pm
===================================================================
--- trunk/src/perlmods/OpenSRF/UnixServer.pm 2008-03-31 20:35:20 UTC (rev 1295)
+++ trunk/src/perlmods/OpenSRF/UnixServer.pm 2008-03-31 20:47:41 UTC (rev 1296)
@@ -14,6 +14,7 @@
use vars qw/@ISA $app/;
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Carp;
+use FreezeThaw qw/thaw/;
use IO::Socket::INET;
use IO::Socket::UNIX;
@@ -97,6 +98,7 @@
exit;
}
+ ($data) = thaw($data);
my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
if(!ref($app_session)) {
More information about the opensrf-commits
mailing list