[Opensrf-commits] r1081 - in trunk/src/python: . osrf
svn at svn.open-ils.org
svn at svn.open-ils.org
Sat Aug 18 21:20:26 EDT 2007
Author: erickson
Date: 2007-08-18 21:16:02 -0400 (Sat, 18 Aug 2007)
New Revision: 1081
Modified:
trunk/src/python/osrf/json.py
trunk/src/python/osrf/log.py
trunk/src/python/osrf/net.py
trunk/src/python/osrf/ses.py
trunk/src/python/osrf/stack.py
trunk/src/python/osrf/system.py
trunk/src/python/srfsh.py
Log:
added support for multi-threaded client interactions. much like the java lib, each thread is allowed 1 jabber connection, as opposed to 1 process-wide jabber connection. also did some re-tabbing to force 4-space tabs (not raw tabs) - more of those to come
Modified: trunk/src/python/osrf/json.py
===================================================================
--- trunk/src/python/osrf/json.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/json.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -92,7 +92,7 @@
for c in json:
- if eatws: # simpljson adds a pesky after array and object items
+ if eatws: # simpljson adds a pesky space after array and object items
if c == ' ':
continue
Modified: trunk/src/python/osrf/log.py
===================================================================
--- trunk/src/python/osrf/log.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/log.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -68,6 +68,9 @@
if level == OSRF_LOG_WARN: lvl = 'WARN'; slvl=syslog.LOG_WARNING
if level == OSRF_LOG_ERR: lvl = 'ERR '; slvl=syslog.LOG_ERR
+
+ # XXX when file logging is implemented, wrap io in a semaphore for thread safety
+
file = frgx.sub('',tb[0])
msg = '[%s:%d:%s:%s] %s' % (lvl, os.getpid(), file, tb[1], msg)
syslog.syslog(slvl, msg)
Modified: trunk/src/python/osrf/net.py
===================================================================
--- trunk/src/python/osrf/net.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/net.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -19,129 +19,128 @@
from pyxmpp.jid import JID
from socket import gethostname
from osrf.log import *
-import os, time
+import os, time, threading
import logging
+threadSessions = {}
+
# - log jabber activity (for future reference)
#logger=logging.getLogger()
#logger.addHandler(logging.StreamHandler())
#logger.addHandler(logging.FileHandler('j.log'))
#logger.setLevel(logging.DEBUG)
-__network = None
def osrfSetNetworkHandle(handle):
- """Sets the global network connection handle."""
- global __network
- __network = handle
+ """ Sets the thread-specific network handle"""
+ threadSessions[threading.currentThread().getName()] = handle
def osrfGetNetworkHandle():
- """Returns the global network connection handle."""
- global __network
- return __network
+ """ Returns the thread-specific network connection handle."""
+ return threadSessions.get(threading.currentThread().getName())
class osrfNetworkMessage(object):
- """Network message
+ """Network message
- attributes:
+ attributes:
- sender - message sender
- to - message recipient
- body - the body of the message
- thread - the message thread
- """
+ sender - message sender
+ to - message recipient
+ body - the body of the message
+ thread - the message thread
+ """
- def __init__(self, message=None, **args):
- if message:
- self.body = message.get_body()
- self.thread = message.get_thread()
- self.to = message.get_to()
- if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '':
- self.sender = message.xmlnode.prop('router_from')
- else: self.sender = message.get_from().as_utf8()
- else:
- if args.has_key('sender'): self.sender = args['sender']
- if args.has_key('to'): self.to = args['to']
- if args.has_key('body'): self.body = args['body']
- if args.has_key('thread'): self.thread = args['thread']
+ def __init__(self, message=None, **args):
+ if message:
+ self.body = message.get_body()
+ self.thread = message.get_thread()
+ self.to = message.get_to()
+ if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '':
+ self.sender = message.xmlnode.prop('router_from')
+ else: self.sender = message.get_from().as_utf8()
+ else:
+ if args.has_key('sender'): self.sender = args['sender']
+ if args.has_key('to'): self.to = args['to']
+ if args.has_key('body'): self.body = args['body']
+ if args.has_key('thread'): self.thread = args['thread']
class osrfNetwork(JabberClient):
- def __init__(self, **args):
- self.isconnected = False
+ def __init__(self, **args):
+ self.isconnected = False
- # Create a unique jabber resource
- resource = 'osrf_client'
- if args.has_key('resource'):
- resource = args['resource']
- resource += '_' + gethostname()+':'+ str(os.getpid())
- self.jid = JID(args['username'], args['host'], resource)
+ # Create a unique jabber resource
+ resource = 'python_'
+ if args.has_key('resource'):
+ resource = args['resource']
+ resource += '_' + gethostname()+':'+ str(os.getpid()) + '_'+ threading.currentThread().getName().lower()
+ self.jid = JID(args['username'], args['host'], resource)
- osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" %
- (self.jid.as_utf8(), args['host'], args['port'], args['username']))
+ osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" %
+ (self.jid.as_utf8(), args['host'], args['port'], args['username']))
- #initialize the superclass
- JabberClient.__init__(self, self.jid, args['password'], args['host'])
- self.queue = []
+ #initialize the superclass
+ JabberClient.__init__(self, self.jid, args['password'], args['host'])
+ self.queue = []
- def connect(self):
- JabberClient.connect(self)
- while not self.isconnected:
- stream = self.get_stream()
- act = stream.loop_iter(10)
- if not act: self.idle()
+ def connect(self):
+ JabberClient.connect(self)
+ while not self.isconnected:
+ stream = self.get_stream()
+ act = stream.loop_iter(10)
+ if not act: self.idle()
- def setRecvCallback(self, func):
- """The callback provided is called when a message is received.
-
- The only argument to the function is the received message. """
- self.recvCallback = func
+ def setRecvCallback(self, func):
+ """The callback provided is called when a message is received.
+
+ The only argument to the function is the received message. """
+ self.recvCallback = func
- def session_started(self):
- osrfLogInfo("Successfully connected to the opensrf network")
- self.authenticated()
- self.stream.set_message_handler("normal",self.message_received)
- self.isconnected = True
+ def session_started(self):
+ osrfLogInfo("Successfully connected to the opensrf network")
+ self.authenticated()
+ self.stream.set_message_handler("normal",self.message_received)
+ self.isconnected = True
- def send(self, message):
- """Sends the provided network message."""
- osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body))
- msg = Message(None, None, message.to, None, None, None, message.body, message.thread)
- self.stream.send(msg)
-
- def message_received(self, stanza):
- """Handler for received messages."""
- osrfLogInternal("jabber received a message of type %s" % stanza.get_type())
- if stanza.get_type()=="headline":
- return True
- # check for errors
- osrfLogInternal("jabber received message from %s : %s"
- % (stanza.get_from().as_utf8(), stanza.get_body()))
- self.queue.append(osrfNetworkMessage(stanza))
- return True
+ def send(self, message):
+ """Sends the provided network message."""
+ osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body))
+ msg = Message(None, None, message.to, None, None, None, message.body, message.thread)
+ self.stream.send(msg)
+
+ def message_received(self, stanza):
+ """Handler for received messages."""
+ osrfLogInternal("jabber received a message of type %s" % stanza.get_type())
+ if stanza.get_type()=="headline":
+ return True
+ # check for errors
+ osrfLogInternal("jabber received message from %s : %s"
+ % (stanza.get_from().as_utf8(), stanza.get_body()))
+ self.queue.append(osrfNetworkMessage(stanza))
+ return True
- def recv(self, timeout=120):
- """Attempts to receive a message from the network.
+ def recv(self, timeout=120):
+ """Attempts to receive a message from the network.
- timeout - max number of seconds to wait for a message.
- If no message is received in 'timeout' seconds, None is returned. """
+ timeout - max number of seconds to wait for a message.
+ If no message is received in 'timeout' seconds, None is returned. """
- msg = None
- if len(self.queue) == 0:
- while timeout >= 0 and len(self.queue) == 0:
- starttime = time.time()
- osrfLogInternal("going into stream loop at " + str(starttime))
- act = self.get_stream().loop_iter(timeout)
- endtime = time.time() - starttime
- timeout -= endtime
- osrfLogInternal("exiting stream loop after %s seconds" % str(endtime))
- osrfLogInternal("act = %s : queue length = %d" % (act, len(self.queue)) )
- if not act: self.idle()
+ msg = None
+ if len(self.queue) == 0:
+ while timeout >= 0 and len(self.queue) == 0:
+ starttime = time.time()
+ osrfLogInternal("going into stream loop at " + str(starttime))
+ act = self.get_stream().loop_iter(timeout)
+ endtime = time.time() - starttime
+ timeout -= endtime
+ osrfLogInternal("exiting stream loop after %s seconds" % str(endtime))
+ osrfLogInternal("act = %s : queue length = %d" % (act, len(self.queue)) )
+ if not act: self.idle()
- # if we've acquired a message, handle it
- if len(self.queue) > 0:
- self.recvCallback(self.queue.pop(0))
- return None
+ # if we've acquired a message, handle it
+ if len(self.queue) > 0:
+ self.recvCallback(self.queue.pop(0))
+ return None
Modified: trunk/src/python/osrf/ses.py
===================================================================
--- trunk/src/python/osrf/ses.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/ses.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -19,7 +19,7 @@
from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
from osrf.log import *
from osrf.const import *
-import random, sys, os, time
+import random, sys, os, time, threading
# -----------------------------------------------------------------------
@@ -35,10 +35,16 @@
class osrfSession(object):
"""Abstract session superclass."""
+ ''' Global cache of in-service sessions '''
+ sessionCache = {}
+
def __init__(self):
# by default, we're connected to no one
self.state = OSRF_APP_SESSION_DISCONNECTED
+ def findSession(threadTrace):
+ return osrfSession.sessionCache.get(threadTrace)
+ findSession = staticmethod(findSession)
def wait(self, timeout=120):
"""Wait up to <timeout> seconds for data to arrive on the network"""
@@ -58,7 +64,7 @@
def cleanup(self):
"""Removes the session from the global session cache."""
- del osrfClientSession.sessionCache[self.thread]
+ del osrfSession.sessionCache[self.thread]
class osrfClientSession(osrfSession):
"""Client session object. Use this to make server requests."""
@@ -78,7 +84,8 @@
self.origRemoteId = self.remoteId
# generate a random message thread
- self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time()))
+ self.thread = "%s%s%s%s" % (os.getpid(),
+ str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
# how many requests this session has taken part in
self.nextId = 0
@@ -87,7 +94,7 @@
self.requests = {}
# cache this session in the global session cache
- osrfClientSession.sessionCache[self.thread] = self
+ osrfSession.sessionCache[self.thread] = self
def resetRequestTimeout(self, rid):
req = self.findRequest(rid)
@@ -190,13 +197,6 @@
-osrfSession.sessionCache = {}
-def osrfFindSession(thread):
- """Finds a session in the global cache."""
- try:
- return osrfClientSession.sessionCache[thread]
- except: return None
-
class osrfRequest(object):
"""Represents a single OpenSRF request.
A request is made and any resulting respones are
Modified: trunk/src/python/osrf/stack.py
===================================================================
--- trunk/src/python/osrf/stack.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/stack.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -16,13 +16,13 @@
from osrf.json import *
from osrf.log import *
from osrf.ex import *
-from osrf.ses import osrfFindSession, osrfClientSession, osrfServerSession
+from osrf.ses import osrfSession, osrfClientSession, osrfServerSession
from osrf.const import *
from time import time
def osrfPushStack(netMessage):
- ses = osrfFindSession(netMessage.thread)
+ ses = osrfSession.findSession(netMessage.thread)
if not ses:
# This is an incoming request from a client, create a new server session
Modified: trunk/src/python/osrf/system.py
===================================================================
--- trunk/src/python/osrf/system.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/osrf/system.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -49,3 +49,5 @@
+
+
Modified: trunk/src/python/srfsh.py
===================================================================
--- trunk/src/python/srfsh.py 2007-08-17 21:57:31 UTC (rev 1080)
+++ trunk/src/python/srfsh.py 2007-08-19 01:16:02 UTC (rev 1081)
@@ -11,73 +11,73 @@
# main listen loop
# -------------------------------------------------------------------
def do_loop():
- while True:
+ while True:
- try:
- #line = raw_input("srfsh% ")
- line = raw_input("\033[01;32msrfsh\033[01;34m% \033[00m")
- if not len(line):
- continue
- if lower(line) == 'exit' or lower(line) == 'quit':
- break
- parts = split(line)
+ try:
+ #line = raw_input("srfsh% ")
+ line = raw_input("\033[01;32msrfsh\033[01;34m% \033[00m")
+ if not len(line):
+ continue
+ if lower(line) == 'exit' or lower(line) == 'quit':
+ break
+ parts = split(line)
- command = parts[0]
-
- if command == 'request':
- parts.pop(0)
- handle_request(parts)
- continue
+ command = parts[0]
+
+ if command == 'request':
+ parts.pop(0)
+ handle_request(parts)
+ continue
- if command == 'math_bench':
- parts.pop(0)
- handle_math_bench(parts)
- continue
+ if command == 'math_bench':
+ parts.pop(0)
+ handle_math_bench(parts)
+ continue
- if command == 'help':
- handle_help()
- continue
+ if command == 'help':
+ handle_help()
+ continue
- if command == 'set':
- parts.pop(0)
- handle_set(parts)
+ if command == 'set':
+ parts.pop(0)
+ handle_set(parts)
- if command == 'get':
- parts.pop(0)
- handle_get(parts)
+ if command == 'get':
+ parts.pop(0)
+ handle_get(parts)
- except KeyboardInterrupt:
- print ""
+ except KeyboardInterrupt:
+ print ""
- except EOFError:
- print "exiting..."
- sys.exit(0)
+ except EOFError:
+ print "exiting..."
+ sys.exit(0)
# -------------------------------------------------------------------
# Set env variables to control behavior
# -------------------------------------------------------------------
def handle_set(parts):
- m = re.compile('(.*)=(.*)').match(parts[0])
- key = m.group(1)
- val = m.group(2)
- set_var(key, val)
- print "%s = %s" % (key, val)
+ m = re.compile('(.*)=(.*)').match(parts[0])
+ key = m.group(1)
+ val = m.group(2)
+ set_var(key, val)
+ print "%s = %s" % (key, val)
def handle_get(parts):
- try:
- print get_var(parts[0])
- except:
- print ""
+ try:
+ print get_var(parts[0])
+ except:
+ print ""
# -------------------------------------------------------------------
# Prints help info
# -------------------------------------------------------------------
def handle_help():
- print """
+ print """
help
- show this menu
@@ -93,88 +93,88 @@
Environment variables:
SRFSH_OUTPUT = pretty - print pretty JSON and key/value pairs for network objects
= raw - print formatted JSON
- """
+ """
-
+
# -------------------------------------------------------------------
# performs an opesnrf request
# -------------------------------------------------------------------
def handle_request(parts):
- service = parts.pop(0)
- method = parts.pop(0)
- jstr = '[%s]' % join(parts)
- params = None
+ service = parts.pop(0)
+ method = parts.pop(0)
+ jstr = '[%s]' % join(parts)
+ params = None
- try:
- params = osrfJSONToObject(jstr)
- except:
- print "Error parsing JSON: %s" % jstr
- return
+ try:
+ params = osrfJSONToObject(jstr)
+ except:
+ print "Error parsing JSON: %s" % jstr
+ return
- ses = osrfClientSession(service)
+ ses = osrfClientSession(service)
- end = None
- start = time.time()
+ end = None
+ start = time.time()
- req = ses.request2(method, tuple(params))
+ req = ses.request2(method, tuple(params))
- while True:
- resp = req.recv(timeout=120)
- if not end:
- total = time.time() - start
- if not resp: break
+ while True:
+ resp = req.recv(timeout=120)
+ if not end:
+ total = time.time() - start
+ if not resp: break
- otp = get_var('SRFSH_OUTPUT')
- if otp == 'pretty':
- print "\n" + osrfDebugNetworkObject(resp.content())
- else:
- print osrfFormatJSON(osrfObjectToJSON(resp.content()))
+ otp = get_var('SRFSH_OUTPUT')
+ if otp == 'pretty':
+ print "\n" + osrfDebugNetworkObject(resp.content())
+ else:
+ print osrfFormatJSON(osrfObjectToJSON(resp.content()))
- req.cleanup()
- ses.cleanup()
+ req.cleanup()
+ ses.cleanup()
- print '-'*60
- print "Total request time: %f" % total
- print '-'*60
+ print '-'*60
+ print "Total request time: %f" % total
+ print '-'*60
def handle_math_bench(parts):
- count = int(parts.pop(0))
- ses = osrfClientSession('opensrf.math')
- times = []
+ count = int(parts.pop(0))
+ ses = osrfClientSession('opensrf.math')
+ times = []
- for i in range(100):
- if i % 10: sys.stdout.write('.')
- else: sys.stdout.write( str( i / 10 ) )
- print "";
+ for i in range(100):
+ if i % 10: sys.stdout.write('.')
+ else: sys.stdout.write( str( i / 10 ) )
+ print "";
- for i in range(count):
-
- starttime = time.time()
- req = ses.request('add', 1, 2)
- resp = req.recv(timeout=2)
- endtime = time.time()
-
- if resp.content() == 3:
- sys.stdout.write("+")
- sys.stdout.flush()
- times.append( endtime - starttime )
- else:
- print "What happened? %s" % str(resp.content())
-
- req.cleanup()
- if not ( (i+1) % 100):
- print ' [%d]' % (i+1)
-
- ses.cleanup()
- total = 0
- for i in times: total += i
- print "\naverage time %f" % (total / len(times))
+ for i in range(count):
+
+ starttime = time.time()
+ req = ses.request('add', 1, 2)
+ resp = req.recv(timeout=2)
+ endtime = time.time()
+
+ if resp.content() == 3:
+ sys.stdout.write("+")
+ sys.stdout.flush()
+ times.append( endtime - starttime )
+ else:
+ print "What happened? %s" % str(resp.content())
+
+ req.cleanup()
+ if not ( (i+1) % 100):
+ print ' [%d]' % (i+1)
+
+ ses.cleanup()
+ total = 0
+ for i in times: total += i
+ print "\naverage time %f" % (total / len(times))
@@ -183,87 +183,87 @@
# Defines the tab-completion handling and sets up the readline history
# -------------------------------------------------------------------
def setup_readline():
- class SrfshCompleter(object):
- def __init__(self, words):
- self.words = words
- self.prefix = None
-
- def complete(self, prefix, index):
- if prefix != self.prefix:
- # find all words that start with this prefix
- self.matching_words = [
- w for w in self.words if w.startswith(prefix)
- ]
- self.prefix = prefix
- try:
- return self.matching_words[index]
- except IndexError:
- return None
-
- words = 'request', 'help', 'exit', 'quit', 'opensrf.settings', 'opensrf.math', 'set'
- completer = SrfshCompleter(words)
- readline.parse_and_bind("tab: complete")
- readline.set_completer(completer.complete)
+ class SrfshCompleter(object):
+ def __init__(self, words):
+ self.words = words
+ self.prefix = None
+
+ def complete(self, prefix, index):
+ if prefix != self.prefix:
+ # find all words that start with this prefix
+ self.matching_words = [
+ w for w in self.words if w.startswith(prefix)
+ ]
+ self.prefix = prefix
+ try:
+ return self.matching_words[index]
+ except IndexError:
+ return None
+
+ words = 'request', 'help', 'exit', 'quit', 'opensrf.settings', 'opensrf.math', 'set'
+ completer = SrfshCompleter(words)
+ readline.parse_and_bind("tab: complete")
+ readline.set_completer(completer.complete)
- histfile = os.path.join(get_var('HOME'), ".srfsh_history")
- try:
- readline.read_history_file(histfile)
- except IOError:
- pass
- atexit.register(readline.write_history_file, histfile)
+ histfile = os.path.join(get_var('HOME'), ".srfsh_history")
+ try:
+ readline.read_history_file(histfile)
+ except IOError:
+ pass
+ atexit.register(readline.write_history_file, histfile)
def do_connect():
- file = os.path.join(get_var('HOME'), ".srfsh.xml")
- print_green("Connecting to opensrf...")
- osrfConnect(file, 'srfsh')
- print_red('OK\n')
+ file = os.path.join(get_var('HOME'), ".srfsh.xml")
+ print_green("Connecting to opensrf...")
+ osrfConnect(file, 'srfsh')
+ print_red('OK\n')
def load_plugins():
- # Load the user defined external plugins
- # XXX Make this a real module interface, with tab-complete words, commands, etc.
- plugins = osrfConfigValue('plugins')
- plugins = osrfConfigValue('plugins.plugin')
- if not isinstance(plugins, list):
- plugins = [plugins]
+ # Load the user defined external plugins
+ # XXX Make this a real module interface, with tab-complete words, commands, etc.
+ plugins = osrfConfigValue('plugins')
+ plugins = osrfConfigValue('plugins.plugin')
+ if not isinstance(plugins, list):
+ plugins = [plugins]
- for module in plugins:
- name = module['module']
- init = module['init']
- print_green("Loading module %s..." % name)
+ for module in plugins:
+ name = module['module']
+ init = module['init']
+ print_green("Loading module %s..." % name)
- try:
- str = 'from %s import %s\n%s()' % (name, init, init)
- exec(str)
- print_red('OK\n')
+ try:
+ str = 'from %s import %s\n%s()' % (name, init, init)
+ exec(str)
+ print_red('OK\n')
- except Exception, e:
- sys.stderr.write("\nError importing plugin %s, with init symbol %s: \n%s\n" % (name, init, e))
+ except Exception, e:
+ sys.stderr.write("\nError importing plugin %s, with init symbol %s: \n%s\n" % (name, init, e))
def set_vars():
- if not get_var('SRFSH_OUTPUT'):
- set_var('SRFSH_OUTPUT', 'pretty')
+ if not get_var('SRFSH_OUTPUT'):
+ set_var('SRFSH_OUTPUT', 'pretty')
def set_var(key, val):
- os.environ[key] = val
+ os.environ[key] = val
def get_var(key):
- try: return os.environ[key]
- except: return ''
-
-
+ try: return os.environ[key]
+ except: return ''
+
+
def print_green(str):
- sys.stdout.write("\033[01;32m")
- sys.stdout.write(str)
- sys.stdout.write("\033[00m")
- sys.stdout.flush()
+ sys.stdout.write("\033[01;32m")
+ sys.stdout.write(str)
+ sys.stdout.write("\033[00m")
+ sys.stdout.flush()
def print_red(str):
- sys.stdout.write("\033[01;31m")
- sys.stdout.write(str)
- sys.stdout.write("\033[00m")
- sys.stdout.flush()
+ sys.stdout.write("\033[01;31m")
+ sys.stdout.write(str)
+ sys.stdout.write("\033[00m")
+ sys.stdout.flush()
More information about the opensrf-commits
mailing list