[open-ils-commits] r18350 - in branches/rel_2_0/Open-ILS/src: perlmods/OpenILS/Application perlmods/OpenILS/Application/Trigger support-scripts (erickson)
svn at svn.open-ils.org
svn at svn.open-ils.org
Thu Oct 14 16:49:04 EDT 2010
Author: erickson
Date: 2010-10-14 16:48:58 -0400 (Thu, 14 Oct 2010)
New Revision: 18350
Modified:
branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm
branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm
branches/rel_2_0/Open-ILS/src/support-scripts/action_trigger_runner.pl
Log:
Back-porting a number of action/trigger fixes from trunk:
parallel collection and reaction with multisession
cut down xact begin/rollbacks to reduce call overhead during collection
force granularity-only when running w/ --granularity
Modified: branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm
===================================================================
--- branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm 2010-10-14 20:17:00 UTC (rev 18349)
+++ branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm 2010-10-14 20:48:58 UTC (rev 18350)
@@ -2,13 +2,10 @@
use strict; use warnings;
use OpenSRF::EX qw/:try/;
use OpenSRF::Utils::JSON;
-
use OpenSRF::Utils::Logger qw/$logger/;
-
use OpenILS::Utils::Fieldmapper;
use OpenILS::Utils::CStoreEditor q/:funcs/;
use OpenILS::Application::Trigger::ModRunner;
-
use Safe;
my $log = 'OpenSRF::Utils::Logger';
@@ -19,11 +16,17 @@
my $editor = shift;
$class = ref($class) || $class;
- return $id if (ref($id) && ref($id) eq $class);
-
my $standalone = $editor ? 0 : 1;
$editor ||= new_editor();
+ if (ref($id) && ref($id) eq $class) {
+ $id->environment->{EventProcessor} = $id
+ if ($id->environment->{complete}); # in case it came over an opensrf tube
+ $id->editor( $editor );
+ $id->standalone( $standalone );
+ return $id;
+ }
+
my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class;
return $self->init()
@@ -109,6 +112,11 @@
$self->editor->xact_rollback || return undef;
}
+ unless($self->target) {
+ $self->update_state('invalid');
+ $self->valid(0);
+ }
+
return $self;
}
@@ -473,11 +481,16 @@
my $collector = shift;
my $label = shift;
my $path = shift;
+ my $ed = shift;
+ my $outer = 0;
+ if (!$ed) {
+ $ed = new_editor(xact=>1);
+ $outer = 1;
+ }
my $step = shift(@$path);
-
my $fhint = Fieldmapper->publish_fieldmapper->{$context->class_name}{links}{$step}{class};
my $fclass = $self->_fm_class_by_hint( $fhint );
@@ -501,10 +514,6 @@
$meth =~ s/Fieldmapper:://;
$meth =~ s/::/_/g;
- my $ed = grep( /open-ils.cstore/, @{$fclass->Controller} ) ?
- $self->editor :
- new_rstore_editor(($self->standalone ? () : (xact=>1)));
-
my $obj = $context->$step();
$logger->debug(
@@ -521,18 +530,10 @@
my $def_id = $self->event->event_def->id;
my $str_path = join('.', @$path);
- if ($self->standalone) {
- $ed->xact_begin || return undef;
- }
-
$obj = $_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} ||
$ed->$meth( ($multi) ? { $ffield => $lval } : $lval);
$_object_by_path_cache{$def_id}{$str_path}{$step}{$ffield}{$lval} ||= $obj;
-
- if ($self->standalone) {
- $ed->xact_rollback || return undef;
- }
}
}
@@ -547,7 +548,7 @@
for (@$obj_list) {
my @path_clone = @$path;
- $self->_object_by_path( $_, $collector, $label, \@path_clone );
+ $self->_object_by_path( $_, $collector, $label, \@path_clone, $ed );
}
$obj = $$obj_list[0] if (!$multi || $rtype eq 'might_have');
@@ -590,6 +591,7 @@
}
}
+ $ed->rollback if ($outer);
return $obj;
}
Modified: branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm
===================================================================
--- branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm 2010-10-14 20:17:00 UTC (rev 18349)
+++ branches/rel_2_0/Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm 2010-10-14 20:48:58 UTC (rev 18350)
@@ -7,6 +7,7 @@
use OpenSRF::Utils::JSON;
use OpenSRF::AppSession;
+use OpenSRF::MultiSession;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Logger qw/$logger/;
use OpenSRF::Utils qw/:datetime/;
@@ -21,8 +22,16 @@
my $log = 'OpenSRF::Utils::Logger';
+my $parallel_collect;
+my $parallel_react;
-sub initialize {}
+sub initialize {
+
+ my $conf = OpenSRF::Utils::SettingsClient->new;
+ $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
+ $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
+
+}
sub child_init {}
sub create_active_events_for_object {
@@ -619,6 +628,45 @@
api_level=> 1
);
+sub gather_events {
+ my $self = shift;
+ my $client = shift;
+ my $e_ids = shift;
+
+ $e_ids = [$e_ids] if (!ref($e_ids));
+
+ my @events;
+ for my $e_id (@$e_ids) {
+ my $e;
+ try {
+ $e = OpenILS::Application::Trigger::Event->new($e_id);
+ } catch Error with {
+ $logger->error("trigger: Event creation failed with ".shift());
+ };
+
+ next if !$e or $e->event->state eq 'invalid';
+
+ try {
+ $e->build_environment;
+ } catch Error with {
+ $logger->error("trigger: Event environment building failed with ".shift());
+ };
+
+ $e->editor->disconnect;
+ $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $client->respond($e);
+ }
+
+ OpenILS::Application::Trigger::Event->ClearObjectCache();
+
+ return undef;
+}
+__PACKAGE__->register_method(
+ api_name => 'open-ils.trigger.event.gather',
+ method => 'gather_events',
+ api_level=> 1
+);
+
sub grouped_events {
my $self = shift;
my $client = shift;
@@ -636,39 +684,56 @@
return \%groups;
}
- for my $e_id ( @$events ) {
- $logger->info("trigger: processing event $e_id");
+ my @fleshed_events;
- # let the client know we're still chugging along TODO add osrf support for method_lookup $client's
- $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ if ($parallel_collect == 1 or @$events == 1) { # use method lookup
+ @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
+ } else {
+ my $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_collect,
+ success_handler => sub {
+ my $self = shift;
+ my $req = shift;
- my $e;
- try {
- $e = OpenILS::Application::Trigger::Event->new($e_id);
- } catch Error with {
- $logger->error("trigger: Event creation failed with ".shift());
- };
+ push @fleshed_events,
+ map { OpenILS::Application::Trigger::Event->new($_) }
+ map { $_->content }
+ @{ $req->{response} };
+ },
+ );
- next unless $e;
+ $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
- try {
- $e->build_environment;
- } catch Error with {
- $logger->error("trigger: Event environment building failed with ".shift());
- };
+ $self_multi->session_wait(1);
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ }
+ for my $e (@fleshed_events) {
if (my $group = $e->event->event_def->group_field) {
# split the grouping link steps
my @steps = split /\./, $group;
my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
- # find the grouping object
- my $node = $e->target;
- $node = $node->$_() for ( @steps );
+ my $node;
+ eval {
+ $node = $e->target;
+ $node = $node->$_() for ( @steps );
+ };
+ unless($node) { # should not get here, but to be safe..
+ $e->update_state('invalid');
+ next;
+ }
+
# get the grouping value for the grouping object on this event
my $ident_value = $node->$group_field();
+ if(ref $ident_value) {
+ my $ident_field = $ident_value->Identity;
+ $ident_value = $ident_value->$ident_field()
+ }
# push this event onto the event+grouping_value stack
$groups{$e->event->event_def->id}{$ident_value} ||= [];
@@ -677,11 +742,9 @@
# it's a non-grouped event
push @{ $groups{'*'} }, $e;
}
-
- $e->editor->disconnect;
}
- OpenILS::Application::Trigger::Event->ClearObjectCache();
+
return \%groups;
}
__PACKAGE__->register_method(
@@ -697,50 +760,77 @@
my $granflag = shift;
my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
- $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+ $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
- # Could report on how the "found" events were grouped, but who's going to
- # consume that information?
-# for my $key (keys %$groups) {
-# if (@{ $$groups{$key} }) {
-# $client->respond({"status" => "found"});
-# last;
-# }
-# }
+ my $self_multi;
+ if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
+ $self_multi = OpenSRF::MultiSession->new(
+ app => 'open-ils.trigger',
+ cap => $parallel_react,
+ session_hash_function => sub {
+ my $args = shift;
+ return $args->{target_id};
+ },
+ success_handler => sub {
+ my $me = shift;
+ my $req = shift;
+ $client->respond( $req->{response}->[0]->content );
+ }
+ );
+ }
for my $def ( keys %$groups ) {
if ($def eq '*') {
$logger->info("trigger: run_all_events firing un-grouped events");
for my $event ( @{ $$groups{'*'} } ) {
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event.fire')
- ->run($event)
- );
+ if ($self_multi) {
+ $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+ $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event.fire')
+ ->run($event)
+ );
+ }
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing un-grouped events");
+ $logger->info("trigger: run_all_events completed queuing un-grouped events");
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} else {
my $defgroup = $$groups{$def};
$logger->info("trigger: run_all_events firing events for grouped event def=$def");
for my $ident ( keys %$defgroup ) {
+ $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
try {
- $client->respond(
- $self
- ->method_lookup('open-ils.trigger.event_group.fire')
- ->run($$defgroup{$ident})
- );
+ if ($self_multi) {
+ $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
+ $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
+ } else {
+ $client->respond(
+ $self
+ ->method_lookup('open-ils.trigger.event_group.fire')
+ ->run($$defgroup{$ident})
+ );
+ }
+ $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
} catch Error with {
$logger->error("trigger: event firing failed with ".shift());
};
}
- $logger->info("trigger: run_all_events completed firing events for grouped event def=$def");
+ $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
}
}
+
+ $self_multi->session_wait(1) if ($self_multi);
+ $logger->info("trigger: run_all_events completed firing events");
+
+ $client->respond_complete();
+ return undef;
}
__PACKAGE__->register_method(
api_name => 'open-ils.trigger.event.run_all_pending',
Modified: branches/rel_2_0/Open-ILS/src/support-scripts/action_trigger_runner.pl
===================================================================
--- branches/rel_2_0/Open-ILS/src/support-scripts/action_trigger_runner.pl 2010-10-14 20:17:00 UTC (rev 18349)
+++ branches/rel_2_0/Open-ILS/src/support-scripts/action_trigger_runner.pl 2010-10-14 20:48:58 UTC (rev 18350)
@@ -58,6 +58,9 @@
my $max_sleep = $opt_max_sleep;
+#XXX need to figure out why this is required...
+$opt_gran_only = $opt_granularity ? 1 : 0;
+
$opt_lockfile .= '.' . $opt_granularity if ($opt_granularity && $opt_gran_only);
# typical passive hook filters
More information about the open-ils-commits
mailing list