[Opensrf-commits] r1079 - in trunk/src/java/org/opensrf: . net/xmpp
test
svn at svn.open-ils.org
svn at svn.open-ils.org
Fri Aug 17 18:01:30 EDT 2007
Author: erickson
Date: 2007-08-17 17:57:16 -0400 (Fri, 17 Aug 2007)
New Revision: 1079
Added:
trunk/src/java/org/opensrf/test/TestThread.java
Modified:
trunk/src/java/org/opensrf/ClientSession.java
trunk/src/java/org/opensrf/Request.java
trunk/src/java/org/opensrf/Session.java
trunk/src/java/org/opensrf/Sys.java
trunk/src/java/org/opensrf/net/xmpp/XMPPSession.java
Log:
added multi-threaded client support to the opensrf network/xmpp layer
this is all managed below the covers so that clients can continue to safely
use bootstrapClient and will all "just work"
we now allow 1 xmpp connection per thread, as opposed to 1 per process.
added a test module
Modified: trunk/src/java/org/opensrf/ClientSession.java
===================================================================
--- trunk/src/java/org/opensrf/ClientSession.java 2007-08-17 21:50:01 UTC (rev 1078)
+++ trunk/src/java/org/opensrf/ClientSession.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -51,7 +51,7 @@
/** create a random thread */
long time = new Date().getTime();
Random rand = new Random(time);
- setThread(rand.nextInt()+""+rand.nextInt()+""+time);
+ setThread(rand.nextInt()+""+rand.nextInt()+""+time+Thread.currentThread().getId());
nextId = 0;
requests = new HashMap<Integer, Request>();
@@ -115,6 +115,7 @@
Request req = findRequest(msg.getId());
if(req == null) {
/** LOG that we've received a result to a non-existant request */
+ System.err.println(msg.getId() +" has no corresponding request");
return;
}
OSRFObject payload = (OSRFObject) msg.get("payload");
Modified: trunk/src/java/org/opensrf/Request.java
===================================================================
--- trunk/src/java/org/opensrf/Request.java 2007-08-17 21:50:01 UTC (rev 1078)
+++ trunk/src/java/org/opensrf/Request.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -73,11 +73,16 @@
Result result = null;
+ if((result = resultQueue.poll()) != null)
+ return result;
+
if(millis < 0 && !complete) {
/** wait potentially forever for a result to arrive */
- session.waitForMessage(millis);
- if((result = resultQueue.poll()) != null)
- return result;
+ while(!complete) {
+ session.waitForMessage(millis);
+ if((result = resultQueue.poll()) != null)
+ return result;
+ }
} else {
Modified: trunk/src/java/org/opensrf/Session.java
===================================================================
--- trunk/src/java/org/opensrf/Session.java 2007-08-17 21:50:01 UTC (rev 1078)
+++ trunk/src/java/org/opensrf/Session.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -45,10 +45,9 @@
xmsg.setTo(remoteNode);
xmsg.setThread(thread);
xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write());
- XMPPSession ses = XMPPSession.getGlobalSession();
try {
- XMPPSession.getGlobalSession().send(xmsg);
+ XMPPSession.getThreadSession().send(xmsg);
} catch(XMPPException e) {
connectState = ConnectState.DISCONNECTED;
throw new SessionException("Error sending message to " + remoteNode, e);
@@ -63,7 +62,7 @@
public static void waitForMessage(long millis) throws SessionException, MethodException {
try {
Stack.processXMPPMessage(
- XMPPSession.getGlobalSession().recv(millis));
+ XMPPSession.getThreadSession().recv(millis));
} catch(XMPPException e) {
throw new SessionException("Error waiting for message", e);
}
Modified: trunk/src/java/org/opensrf/Sys.java
===================================================================
--- trunk/src/java/org/opensrf/Sys.java 2007-08-17 21:50:01 UTC (rev 1078)
+++ trunk/src/java/org/opensrf/Sys.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -2,6 +2,9 @@
import org.opensrf.util.*;
import org.opensrf.net.xmpp.*;
+import java.util.Random;
+import java.util.Date;
+import java.net.InetAddress;
public class Sys {
@@ -16,6 +19,10 @@
public static void bootstrapClient(String configFile, String configContext)
throws ConfigException, SessionException {
+ /** see if the current thread already has a connection */
+ if(XMPPSession.getThreadSession() != null)
+ return;
+
/** create the config parser */
Config config = new Config(configContext);
config.parse(configFile);
@@ -27,11 +34,24 @@
String host = (String) config.getFirst("/domains/domain");
int port = config.getInt("/port");
+
+ /** Create a random login resource string */
+ String res = "java_";
try {
+ res += InetAddress.getLocalHost().getHostAddress();
+ } catch(java.net.UnknownHostException e) {}
+ res += "_"+Math.abs(new Random(new Date().getTime()).nextInt())
+ + "_t"+ Thread.currentThread().getId();
+
+
+ try {
+
/** Connect to the Jabber network */
XMPPSession xses = new XMPPSession(host, port);
- xses.connect(username, passwd, "test-java"); /* XXX */
- XMPPSession.setGlobalSession(xses);
+ System.out.println("resource = " + res);
+ xses.connect(username, passwd, res);
+ XMPPSession.setThreadSession(xses);
+
} catch(XMPPException e) {
throw new SessionException("Unable to bootstrap client", e);
}
@@ -41,7 +61,7 @@
* Shuts down the connection to the opensrf network
*/
public static void shutdown() {
- XMPPSession.getGlobalSession().disconnect();
+ XMPPSession.getThreadSession().disconnect();
}
}
Modified: trunk/src/java/org/opensrf/net/xmpp/XMPPSession.java
===================================================================
--- trunk/src/java/org/opensrf/net/xmpp/XMPPSession.java 2007-08-17 21:50:01 UTC (rev 1078)
+++ trunk/src/java/org/opensrf/net/xmpp/XMPPSession.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -2,6 +2,9 @@
import java.io.*;
import java.net.Socket;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
/**
@@ -21,6 +24,8 @@
public static final String JABBER_DISCONNECT = "</stream:stream>";
+ private static Map threadConnections = new ConcurrentHashMap();
+
/** jabber domain */
private String host;
/** jabber port */
@@ -59,16 +64,56 @@
/**
* Returns the global, process-wide session
*/
+ /*
public static XMPPSession getGlobalSession() {
return globalSession;
}
+ */
+ public static XMPPSession getThreadSession() {
+ return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
+ }
+
/**
+ * Sets the given session as the global session for the current thread
+ * @param ses The session
+ */
+ public static void setThreadSession(XMPPSession ses) {
+ /* every time we create a new connection, clean up any dead threads.
+ * this is cheaper than cleaning up the dead threads at every access. */
+ cleanupThreadSessions();
+ threadConnections.put(new Long(Thread.currentThread().getId()), ses);
+ }
+
+ /**
+ * Analyzes the threadSession data to see if there are any sessions
+ * whose controlling thread has gone away.
+ */
+ private static void cleanupThreadSessions() {
+ Thread threads[] = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+ for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
+ boolean found = false;
+ Long id = (Long) i.next();
+ for(Thread t : threads) {
+ if(t.getId() == id.longValue()) {
+ found = true;
+ break;
+ }
+ }
+ if(!found)
+ threadConnections.remove(id);
+ }
+ }
+
+ /**
* Sets the global, process-wide section
*/
+ /*
public static void setGlobalSession(XMPPSession ses) {
globalSession = ses;
}
+ */
/** true if this session is connected to the server */
@@ -190,6 +235,8 @@
} else {
while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
+ msg = reader.popMessageQueue();
+ if( msg != null ) return msg;
timeout -= reader.waitCoreEvent(timeout);
msg = reader.popMessageQueue();
if( msg != null ) return msg;
@@ -197,7 +244,7 @@
}
}
- return null;
+ return reader.popMessageQueue();
}
Added: trunk/src/java/org/opensrf/test/TestThread.java
===================================================================
--- trunk/src/java/org/opensrf/test/TestThread.java (rev 0)
+++ trunk/src/java/org/opensrf/test/TestThread.java 2007-08-17 21:57:16 UTC (rev 1079)
@@ -0,0 +1,68 @@
+package org.opensrf.test;
+import org.opensrf.*;
+import org.opensrf.util.*;
+import java.util.Map;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.PrintStream;
+
+/**
+ * Connects to the opensrf network once per thread and runs
+ * and runs a series of request acccross all launched threads.
+ * The purpose is to verify that the java threaded client api
+ * is functioning as expected
+ */
+public class TestThread implements Runnable {
+
+ String args[];
+
+ public TestThread(String args[]) {
+ this.args = args;
+ }
+
+ public void run() {
+
+ try {
+
+ Sys.bootstrapClient(args[0], "/config/opensrf");
+ ClientSession session = new ClientSession(args[3]);
+
+ List params = new ArrayList<Object>();
+ for(int i = 5; i < args.length; i++)
+ params.add(new JSONReader(args[3]).read());
+
+ for(int i = 0; i < Integer.parseInt(args[2]); i++) {
+ System.out.println("thread " + Thread.currentThread().getId()+" sending request " + i);
+ Request request = session.request(args[4], params);
+ Result result = request.recv(3000);
+ if(result != null) {
+ System.out.println("thread " + Thread.currentThread().getId()+
+ " got result JSON: " + new JSONWriter(result.getContent()).write());
+ } else {
+ System.out.println("* thread " + Thread.currentThread().getId()+ " got NO result");
+ }
+ }
+
+ Sys.shutdown();
+ } catch(Exception e) {
+ System.err.println(e);
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+
+ if(args.length < 5) {
+ System.out.println( "usage: org.opensrf.test.TestClient "+
+ "<osrfConfigFile> <numthreads> <numiter> <service> <method> [<JSONparam1>, <JSONparam2>]");
+ return;
+ }
+
+ int numThreads = Integer.parseInt(args[1]);
+ for(int i = 0; i < numThreads; i++)
+ new Thread(new TestThread(args)).start();
+ }
+}
+
+
+
More information about the opensrf-commits
mailing list