[open-ils-commits] r18219 - in trunk/Open-ILS: examples src/perlmods/OpenILS/Application src/perlmods/OpenILS/Application/Trigger (erickson)
svn at svn.open-ils.org
svn at svn.open-ils.org
Thu Oct 7 10:56:46 EDT 2010
Author: erickson
Date: 2010-10-07 10:56:43 -0400 (Thu, 07 Oct 2010)
New Revision: 18219
Modified:
trunk/Open-ILS/examples/opensrf.xml.example
trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm
trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm
Log:
Parallel action/trigger collection and reaction
QA'ed patch from miker to support parallel a/t event collection and
reaction. Max parallel procs is controlled by two new opensrf.xml
trigger app_settings. Sample config included, settings disabled by
default.
Modified: trunk/Open-ILS/examples/opensrf.xml.example
===================================================================
--- trunk/Open-ILS/examples/opensrf.xml.example 2010-10-06 22:13:20 UTC (rev 18218)
+++ trunk/Open-ILS/examples/opensrf.xml.example 2010-10-07 14:56:43 UTC (rev 18219)
@@ -590,6 +590,15 @@
<min_spare_children>1</min_spare_children>
<max_spare_children>5</max_spare_children>
</unix_config>
+ <app_settings>
+ <!-- number of parallel open-ils.trigger processes to use for collection and reaction -->
+ <!--
+ <parallel>
+ <collect>3</collect>
+ <react>3</react>
+ </parallel>
+ -->
+ </app_settings>
</open-ils.trigger>
<opensrf.math>
Modified: trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm
===================================================================
--- trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm 2010-10-06 22:13:20 UTC (rev 18218)
+++ trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm 2010-10-07 14:56:43 UTC (rev 18219)
@@ -2,13 +2,10 @@
use strict; use warnings;
use OpenSRF::EX qw/:try/;
use OpenSRF::Utils::JSON;
-
use OpenSRF::Utils::Logger qw/$logger/;
-
use OpenILS::Utils::Fieldmapper;
use OpenILS::Utils::CStoreEditor q/:funcs/;
use OpenILS::Application::Trigger::ModRunner;
-
use Safe;
my $log = 'OpenSRF::Utils::Logger';
@@ -19,11 +16,17 @@
my $editor = shift;
$class = ref($class) || $class;
- return $id if (ref($id) && ref($id) eq $class);
-
my $standalone = $editor ? 0 : 1;
$editor ||= new_editor();
+ if (ref($id) && ref($id) eq $class) {
+ $id->environment->{EventProcessor} = $id
+ if ($id->environment->{complete}); # in case it came over an opensrf tube
+ $id->editor( $editor );
+ $id->standalone( $standalone );
+ return $id;
+ }
+
my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class;
return $self->init()
Modified: trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm
===================================================================
--- trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm 2010-10-06 22:13:20 UTC (rev 18218)
+++ trunk/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm 2010-10-07 14:56:43 UTC (rev 18219)
@@ -7,6 +7,7 @@
use OpenSRF::Utils::JSON;
use OpenSRF::AppSession;
+use OpenSRF::MultiSession;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Logger qw/$logger/;
use OpenSRF::Utils qw/:datetime/;
@@ -21,8 +22,16 @@
my $log = 'OpenSRF::Utils::Logger';
+my $parallel_collect;
+my $parallel_react;
-sub initialize {}
+sub initialize {
+
+ my $conf = OpenSRF::Utils::SettingsClient->new;
+ $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
+ $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
+
+}
sub child_init {}
sub create_active_events_for_object {
@@ -619,6 +628,45 @@
api_level=> 1
);
+sub gather_events {
+ my $self = shift;
+ my $client = shift;
+ my $e_ids = shift;
+
+ $e_ids = [$e_ids] if (!ref($e_ids));
+
+ my @events;
+ for my $e_id (@$e_ids) {
+ my $e;
+ try {
+ $e = OpenILS::Application::Trigger::Event->new($e_id);
+ } catch Error with {
+ $logger->error("trigger: Event creation failed with ".shift());
+ };
+
+ next unless $e;
+
+ try {
+ $e->build_environment;
+ } catch Error with {
+ $logger->error("trigger: Event environment building failed with ".shift());
+ };
+
+ $e->editor->disconnect;
+ $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $client->respond($e);
+ }
+
+ OpenILS::Application::Trigger::Event->ClearObjectCache();
+
+ return undef;
+}
+__PACKAGE__->register_method(
+ api_name => 'open-ils.trigger.event.gather',
+ method => 'gather_events',
+ api_level=> 1
+);
+
sub grouped_events {
my $self = shift;
my $client = shift;
@@ -636,27 +684,33 @@
return \%groups;
}
- for my $e_id ( @$events ) {
- $logger->info("trigger: processing event $e_id");
+ my @fleshed_events;
- # let the client know we're still chugging along TODO add osrf support for method_lookup $client's
- $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ if ($parallel_collect == 1 or @$events == 1) { # use method lookup
+ @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
+ } else {
+ my $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_collect,
+ success_handler => sub {
+ my $self = shift;
+ my $req = shift;
- my $e;
- try {
- $e = OpenILS::Application::Trigger::Event->new($e_id);
- } catch Error with {
- $logger->error("trigger: Event creation failed with ".shift());
- };
+ push @fleshed_events,
+ map { OpenILS::Application::Trigger::Event->new($_) }
+ map { $_->content }
+ @{ $req->{response} };
+ },
+ );
- next unless $e;
+ $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
- try {
- $e->build_environment;
- } catch Error with {
- $logger->error("trigger: Event environment building failed with ".shift());
- };
+ $self_multi->session_wait(1);
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ }
+ for my $e (@fleshed_events) {
if (my $group = $e->event->event_def->group_field) {
# split the grouping link steps
@@ -669,6 +723,10 @@
# get the grouping value for the grouping object on this event
my $ident_value = $node->$group_field();
+ if(ref $ident_value) {
+ my $ident_field = $ident_value->Identity;
+ $ident_value = $ident_value->$ident_field()
+ }
# push this event onto the event+grouping_value stack
$groups{$e->event->event_def->id}{$ident_value} ||= [];
@@ -677,11 +735,9 @@
# it's a non-grouped event
push @{ $groups{'*'} }, $e;
}
-
- $e->editor->disconnect;
}
- OpenILS::Application::Trigger::Event->ClearObjectCache();
+
return \%groups;
}
__PACKAGE__->register_method(
@@ -697,50 +753,77 @@
my $granflag = shift;
my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
- $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
- # Could report on how the "found" events were grouped, but who's going to
- # consume that information?
-# for my $key (keys %$groups) {
-# if (@{ $$groups{$key} }) {
-# $client->respond({"status" => "found"});
-# last;
-# }
-# }
+ my $self_multi;
+ if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
+ $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_react,
+ session_hash_function => sub {
+ my $args = shift;
+ return $args->{target_id};
+ },
+ success_handler => sub {
+ my $me = shift;
+ my $req = shift;
+ $client->respond( $req->{response}->[0]->content );
+ }
+ );
+ }
for my $def ( keys %$groups ) {
if ($def eq '*') {
$logger->info("trigger: run_all_events firing un-grouped events");
for my $event ( @{ $$groups{'*'} } ) {
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event.fire')
- ->run($event)
- );
+ if ($self_multi) {
+ $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event.fire')
+ ->run($event)
+ );
+ }
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing un-grouped events");
+ $logger->info("trigger: run_all_events completed queuing un-grouped events");
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} else {
my $defgroup = $$groups{$def};
$logger->info("trigger: run_all_events firing events for grouped event def=$def");
for my $ident ( keys %$defgroup ) {
+ $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event_group.fire')
- ->run($$defgroup{$ident})
- );
+ if ($self_multi) {
+ $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
+ $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event_group.fire')
+ ->run($$defgroup{$ident})
+ );
+ }
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing events for grouped event def=$def");
+ $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
}
}
+
+ $self_multi->session_wait(1) if ($self_multi);
+ $logger->info("trigger: run_all_events completed firing events");
+
+ $client->respond_complete();
+ return undef;
}
__PACKAGE__->register_method(
api_name => 'open-ils.trigger.event.run_all_pending',
More information about the open-ils-commits
mailing list