diff --git a/src/java/meetup/beeno/util/HUtil.java b/src/java/meetup/beeno/util/HUtil.java index f2b7e8d..0818d48 100644 --- a/src/java/meetup/beeno/util/HUtil.java +++ b/src/java/meetup/beeno/util/HUtil.java @@ -16,6 +16,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; +/** + * Basic utilities to enable easier interaction with HBase. + */ public class HUtil { // max number of digits in a long value private static final int LONG_CHAR_LENGTH = 19; diff --git a/src/jython/jyunit/README b/src/jython/jyunit/README new file mode 100644 index 0000000..2f49443 --- /dev/null +++ b/src/jython/jyunit/README @@ -0,0 +1,99 @@ +JyUnit Testing Framework +============================================================ + +JyUnit is a simple wrapper on top of JUnit and Jython to make writing +unit tests easy. JyUnit provides hooks to: + * capture test assertion results + * batch run all tests within a directory + * combine test results for Java JUnit tests and JyUnit scripts + +JyUnit is packaged as a Jython package. The following modules are +available: + +jyunit.util + All the core assertion statements and supporting hooks for JUnit + + Supported assertion functions: + assertEquals( actual, expected [, mesg ] ) + + assertNotEquals( actual, expected [, mesg ] ) + + assertMoreThan( actual, expected [, mesg ] ) + + assertLessThan( actual, expected [, mesg ] ) + + assertFalse( condition [, mesg ] ) + + assertNotNull( obj [, mesg ] ) + + assertNull( obj [, mesg ] ) + + assertNotSame( obj1, obj2 [, mesg ] ) + + fail( [ mesg ] ) + + error( err [, mesg ] ) + + +jyunit.run + Supporting classes for loading and running JyUnit scripts and JUnit + test cases and running in a batch. + + Usually run as a script to execute batches of tests: + + jython jyunit/run.py [options] + -h, --help Show this usage information + -o, --output=filename Write test results to this file + -r, --recipients=emails Email the test results to these recipients (comma separated) + -d, --directory=path Use this as the test src directory + -j, --javasrc=path Source directory for Java JUnit test cases + -p, --product=name Product name ("meetup", "alliance"), only used for output + -v, --verbose Show extra information as tests are running + + +Installing +---------------------------------------- + +Everything that you need is included in meetup_base. If you have the +meetup_base project checked out locally, then you just need to do the +following to add jython to your project: + + 1) Create a source directory for your jython tests -- this should be + distinct from the source directory for your Java JUnits tests + + 2) Copy the meetup_base/tools/jython script to where you keep your + project scripts. Edit the jython script to point to the correct + project paths for your project classpath script and jython source + directories. + + 3) Copy the meetup_base/lib/jython.jar file to your project's lib + directory. + +Writing a Test +---------------------------------------- + +Conventions: + * scripts should start with "test_" in the file name + * if scripts contain a "run_test" top level function, it will be executed + +Making assertions available: + + from jyunit.util import * + +This makes all the assertion functions available in your script's +namespace so you can call them directly: + + res = ... some test result ... + assertEquals( res, "expected value" ) + + +Running Tests +---------------------------------------- + +Test scripts may be run individually, by invoking the script directly: + + jython test_sample.py + +or by invoking the JyUnit run.py script on the test source directory: + + jython /usr/local/meetup_base/jysrc/jyunit/run.py -d /path/to/testsrc diff --git a/src/jython/jyunit/__init__.py b/src/jython/jyunit/__init__.py new file mode 100644 index 0000000..722c002 --- /dev/null +++ b/src/jython/jyunit/__init__.py @@ -0,0 +1,349 @@ +# +# Setup jyunit module +# + +import difflib +import re +import time +import traceback + + +import java.lang +from junit.framework import TestCase, AssertionFailedError, ComparisonFailure, TestResult + +class SimpleResults(object): + '''Simple data structure to collect the test result output from running a + given test case. + ''' + def __init__(self, test): + self.test = test + self.startTime = None + self.endTime = None + self.failures = [] + self.errors = [] + self.runCount = 0 + self.totalTime = 0 + self.assertCount = 0 + self.testCnt = 0 + + def start(self, test): + # only set this once for a given result obj -- want the first start time + if self.startTime is None: + self.startTime = time.time() + self.runCount = self.runCount + 1 + + def end(self, test): + # always set this, want the last end time + self.endTime = time.time() + if self.startTime is not None: + self.totalTime = self.endTime - self.startTime + + def asserted(self, test): + self.assertCount = self.assertCount + 1 + + def passed(self): + return len(self.failures) == 0 and len(self.errors) == 0 + + def addFailure(self, test, failure): + self.failures.append(failure) + + def addError(self, test, error): + self.errors.append(error) + + def getFailureCount(self): + return len(self.failures) + + def getErrorCount(self): + return len(self.errors) + + def getTestCount(self): + return self.runCount + +class MultiResults(SimpleResults): + '''Represents a test class with multiple test cases''' + def __init__(self, test): + super(MultiResults, self).__init__(test) + self.cases = dict() + + (cls, name) = getTestInfo(test) + if cls is not None: + self.test = cls + if name is not None and name != cls: + self.cases[name] = SimpleResults(name) + + def start(self, test): + super(MultiResults, self).start(test) + res = self.getTestCaseResults(test) + if res is not None: + res.start(test) + + def end(self, test): + super(MultiResults, self).end(test) + res = self.getTestCaseResults(test) + if res is not None: + res.end(test) + + def addFailure(self, test, failure): + res = self.getTestCaseResults(test) + if res is not None: + res.addFailure(test, failure) + + def addError(self, test, error): + res = self.getTestCaseResults(test) + if res is not None: + res.addError(test, error) + + def passed(self): + return all( map(lambda x: x.passed(), self.cases.values()) ) + + def getFailureCount(self): + return sum( map(lambda x: x.getFailureCount(), self.cases.values()) ) + + def getErrorCount(self): + return sum( map(lambda x: x.getErrorCount(), self.cases.values()) ) + + def getTestCount(self): + return len(self.cases) + + def getAllTestCases(self): + testCases = self.cases.values() + testCases.sort( lambda x,y: cmp(str(x.test), str(y.test)) ) + return testCases + + def getTestCaseResults(self, test): + (cls, name) = getTestInfo(test) + if name != cls: + if name not in self.cases: + self.cases[name] = SimpleResults(name) + + return self.cases[name] + + return None + + +def getTestInfo(test): + '''Attempts to extract the test class name and individual test + name from the given test information''' + cls = None + name = None + '''Returns a tuple of the test class and name''' + if "getTestClass" in dir(test) and test.getTestClass() is not None: + if isinstance(test.getTestClass(), java.lang.Class): + cls = java.lang.Class.getName(test.getTestClass()) + name = cls + else: + cls = str(test.getTestClass()) + + teststr = str(test) + m = re.match('(\w*(?:\[\d+\])?)\(([\w\.]*)\).*', teststr) + if m: + name = m.group(1) + cls = m.group(2) + elif "getName" in dir(test): + name = test.getName() + + return (cls, name) + + + + +# ************ Exception classes ****************************** +MAX_TRACE_DEPTH = 5 +TEST_FILES = re.compile(".*jyunit/(run|util).py") +TEST_DIR = re.sub("[^/]+$", "", __file__) + + +class JythonAssertionFailedError(AssertionFailedError): + '''Override JUnit stack trace handling to provide + better reporting of error locations in Jython scripts.''' + def __init__(self, mesg): + self.traceback = traceback.extract_stack() + AssertionFailedError.__init__(self, mesg) + + def fillInStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + self.setStackTrace( frames ) + return self + + def getStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + return frames + + def printStackTrace(self, writer=None): + if writer is None: + writer = java.lang.System.err + + writer.println("AssertionFailureError: %s" % self.getMessage()) + writer.print("at:") + # just output the first stack frame, line no and text + cnt = 0 + for (fname, lineno, func, src) in self.traceback: + # skip tester.py or test_utils.py lines + if TEST_FILES.match( fname ): + continue + if cnt >= MAX_TRACE_DEPTH: + break + + fname = re.sub(TEST_DIR, "", fname) + writer.println("\t%s:%s[%d]\n\t...%s" % ( fname, func, lineno, src ) ) + cnt = cnt + 1 + writer.println() + + +class JythonComparisonFailure(ComparisonFailure): + '''Override JUnit stack trace handling to provide + better reporting of error locations in a Jython context.''' + + def __init__(self, mesg, expected, actual): + self.traceback = traceback.extract_stack() + self.mesg = mesg + self.exp = expected + self.act = actual + ComparisonFailure.__init__(self, mesg, expected, actual) + + def fillInStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + self.setStackTrace( frames ) + return self + + def getStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + return frames + + def getMessage(self): + '''Override the standard comparison failure message for strings. In some large comparisons, + the string output is ridiculously long, so we provide a diff version of it.''' + if self.exp is not None and isinstance( self.exp, str ) \ + and self.act is not None and isinstance( self.act, str ): + + expectLines = self.exp.splitlines(1) + actualLines = self.act.splitlines(1) + d = difflib.Differ() + diffs = d.compare(expectLines, actualLines) + + if len(expectLines) > 1 or len(actualLines) > 1: + mesg = '' + if self.mesg is not None: + mesg = self.mesg + else: + mesg = 'Actual result did not match expected' + + return mesg + ' - diff output:\n' + self.printDiff(diffs) + + # default to standard implementation + return ComparisonFailure.getMessage(self) + + def printDiff(self, diffs): + '''Returns a pseudo-diff formatted string containing the diffs between the + expected and actual strings. A single shared text line is provided before + and after each difference for context. + + Note: Line numbers are not currently provided in the diff output.''' + + diffout = '--- EXPECTED\n+++ ACTUAL\n' + lastline = None + inmatch = False + + dcnt = 0 + for l in diffs: + if len(l) == 0: + continue + if l[0] in ('-', '+', '?'): + if not inmatch: + dcnt = dcnt + 1 + # include last line for context + diffout = diffout + ('\nDiff %d:\n%s' % (dcnt, lastline)) + diffout = diffout + l + inmatch = True + else: + if inmatch: + #first new line after match so output for context + diffout = diffout + l + inmatch = False + + lastline = l + + return diffout + + + def printStackTrace(self, writer=None): + if writer is None: + writer = java.lang.System.err + + writer.println("ComparisonFailure: %s" % self.getMessage()) + writer.print("at:") + # just output the first stack frame, line no and text + cnt = 0 + for (fname, lineno, func, src) in self.traceback: + # skip tester.py or test_utils.py lines + if TEST_FILES.match( fname ): + continue + if cnt >= MAX_TRACE_DEPTH: + break + + fname = re.sub(TEST_DIR, "", fname) + writer.println("\t%s:%s[%d]\n\t...%s" % ( fname, func, lineno, src ) ) + cnt = cnt + 1 + writer.println() + + +class JythonException(java.lang.Exception): + '''Wraps Python traceback in a Java Exception implementation + so it can easily be shown in JUnit results.''' + + def __init__(self, excType, exc, tb): + if tb is None: + tb = traceback.extract_stack() + self.traceback = tb + self.exception = exc + self.excType = excType + + excInfo = "" + if self.exception is not None: + excInfo = "%s: %s" % ( self.excType.__name__, str(self.exception)) + + java.lang.Exception.__init__(self, excInfo) + + def fillInStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + self.setStackTrace( frames ) + return self + + def getStackTrace(self): + frames = [] + for (fname, line, func, txt) in self.traceback: + frames.append( java.lang.StackTraceElement(fname, func, fname, line) ) + + return frames + + def printStackTrace(self, writer=None): + if writer is None: + writer = java.lang.System.err + + + + writer.println("Jython exception: %s" % self.getMessage()) + writer.print("at:") + # no length limit and no filtering for errors + if self.traceback is not None: + for (fname, lineno, func, src) in self.traceback: + fname = re.sub(TEST_DIR, "", fname) + writer.println("\t%s:%s[%d]\n\t...%s" % ( fname, func, lineno, src ) ) + writer.println() + + diff --git a/src/jython/jyunit/report.py b/src/jython/jyunit/report.py new file mode 100644 index 0000000..917606e --- /dev/null +++ b/src/jython/jyunit/report.py @@ -0,0 +1,300 @@ +#********************************************************** +#* Classes and utilities for outputting test results +#* +#********************************************************** + +import os.path +import re +import smtplib +import time + +import java.lang +import java.util +import java.io + +from jyunit import SimpleResults, MultiResults, JythonAssertionFailedError, JythonComparisonFailure, JythonException + +# regex for class names to skip as "internal" in stack traces +INTERNAL_CLASS_REGEX = re.compile("(org\.python\.|sun\.reflect\.|java\.lang\.reflect\.|org\.junit\.|junit\.)") + +class ResultPrinter(object): + '''Base class for outputting test results''' + def __init__(self, results): + self.results = results + + def getFailureStack(self, exception): + '''Returns the contents of the exception stack trace as a string''' + strout = java.io.StringWriter() + pout = java.io.PrintWriter(strout) + outtxt = '' + try: + exception.printStackTrace(pout) + pout.flush() + outtxt = strout.toString() + finally: + pout.close() + strout.close() + + return outtxt + + +class TextPrinter(ResultPrinter): + '''Prints test results in a plain text format suitable for simple email reports. The output + consists of 4 separate sections: + * a header containing overall stats (total # of tests, errors and failures) + * summary list of individual test class failures + * summary list of passed test classes + * detailed failure and error output by test class + + ''' + + def __init__(self, results, out=java.lang.System.err): + self.results = results + self.out = out + + def printAll(self): + (passed, failed) = self.results.splitResults() + self.printHeader(self.out, passed, failed) + self.printSummary(self.out, passed, failed) + # print failure details + if len(failed) > 0: + self.out.println("DETAILS: ") + self.out.println("-"*50) + self.printFailedDetails(self.out, failed) + + def printHeader(self, out, passed, failed): + out.print( \ +'''\ +Run: %d, Passed: %d, Failed: %d +Generated %s +Run Time: %d sec + +''' % ( self.results.totalRunCount(), len(passed), len(failed), java.util.Date().toString(), self.results.totalRunTime() ) ) + + def printSummary(self, out, passed, failed): + if len(failed) > 0: + out.println("FAILED: ") + out.println("-"*50) + self.printFailed(out, failed) + out.println() + + if len(passed) > 0: + out.println("SUCCESSFUL: ") + out.println("-"*50) + self.printPassed(out, passed) + out.println() + + + def getTestCountText(self, res): + extrainfo = "" + if isinstance(res, MultiResults): + extrainfo = "%3d tests " % (res.getTestCount()) + if res.assertCount > 0: + if extrainfo: + extrainfo = extrainfo + "; " + extrainfo = extrainfo + ("%3d assertions" % (res.assertCount)) + + return extrainfo + + def printPassed(self, out, passed): + for res in passed: + out.println( "%-60s %4d sec; %s" % ('[ '+str(res.test)+' ]:', res.totalTime, self.getTestCountText(res)) ) + + def printFailed(self, out, failed): + + fcnt = 1 + for f in failed: + out.println( "%-60s %4d sec; %s %3d failures %3d errors" \ + % ( '[ '+str(f.test)+' ]:', f.totalTime, self.getTestCountText(f), f.getFailureCount(), f.getErrorCount() ) ) + + def printFailedDetails(self, out, failed): + for f in failed: + self.printFailure(out, f) + + def printFailure(self, out, f, depth=0): + if depth == 0: + out.print( \ +'''%-40s %4d sec; %s %3d failures %3d errors +''' % ( '[ '+str(f.test)+' ]:', f.totalTime, self.getTestCountText(f), f.getFailureCount(), f.getErrorCount() ) ) + else: + out.println( "( %s ) " % (str(f.test)) ) + + if isinstance(f, MultiResults): + for c in f.getAllTestCases(): + if c.getErrorCount() > 0 or c.getFailureCount() > 0: + self.printFailure(out, c, depth+1) + else: + if f.getFailureCount() > 0: + out.println(' Failures:') + self.printErrors(out, f.failures, True) + if f.getErrorCount() > 0: + out.println(' Errors:') + self.printErrors(out, f.errors, False) + out.println() + + def printErrors(self, out, errors, abbrev=False): + indent = ' '*4 + subindent = ' '*8 + if len(errors) > 0: + failCnt = 1 + for e in errors: + out.print( '''%s%d. ''' % (indent, failCnt) ) + if abbrev: + self.printFailureStack(out, e.thrownException(), subindent) + else: + e.thrownException().printStackTrace(out) + failCnt = failCnt + 1 + + def printFailureStack(self, out, exception, indent): + '''Prints an abbreviated version of the stack trace for failures''' + # delegate to custom exceptions + if isinstance(exception, JythonAssertionFailedError) or \ + isinstance(exception, JythonComparisonFailure) or \ + isinstance(exception, JythonException): + exception.printStackTrace(out) + return + + out.println("%s: %s" % (exception.getClass().getName(), exception.getMessage())) + skipping = False + for elm in exception.getStackTrace(): + if INTERNAL_CLASS_REGEX.match(elm.getClassName()): + if not skipping: + skipping = True + out.println("%s ..." % (indent)) + continue + + skipping = False + out.println("%s at %s" % (indent, elm.toString())) + + # TODO: chain exception output + +class XMLPrinter(ResultPrinter): + '''Outputs test results in an XML format like that used by the Ant JUnit task. + The results for each test class are output as a separate XML file in the common + directory passed to the constructor. Each file is named in the format: + TEST-.xml + ''' + + def __init__(self, results, dirname): + self.results = results + self.dirname = dirname + from xml.dom.minidom import getDOMImplementation + self.dom = getDOMImplementation() + + def printAll(self): + '''For each test class in the results, outputs an XML file of results for that class''' + for t in self.results.results.values(): + self.printTest(t) + + def printTest(self, test): + doc = self.dom.createDocument(None, "testsuite", None) + root = doc.documentElement + root.setAttribute("errors", str(test.getErrorCount())) + root.setAttribute("failures", str(test.getFailureCount())) + root.setAttribute("hostname", self.results.host) + root.setAttribute("name", str(test.test)) + root.setAttribute("tests", str(test.getTestCount())) + root.setAttribute("time", str(test.totalTime)) + root.setAttribute("timestamp", time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(test.startTime))) + # add runtime properties + sysprops = java.lang.System.getProperties() + propselt = doc.createElement("properties") + for n in sysprops.propertyNames(): + prop = doc.createElement("property") + prop.setAttribute("name", n) + prop.setAttribute("value", sysprops.getProperty(n)) + propselt.appendChild(prop) + + root.appendChild(propselt) + + # add individual test cases + if isinstance(test, MultiResults): + for c in test.getAllTestCases(): + self.printTestCase(doc, test, c) + else: + # print as a single test case + self.printTestCase(doc, test, test, "[main]") + # add any top level errors + #if test.getErrorCount() > 0: + # for e in test.errors: + # self.printError("error", doc, root, e) + #if test.getFailureCount() > 0: + # for f in test.failures: + # self.printError("failure", doc, root, f) + + # TODO: actually capture output here + root.appendChild( doc.createElement("system-out") ) + root.appendChild( doc.createElement("system-err") ) + + # output the document + fname = "TEST-%s.xml" % (test.test) + outfile = os.path.join(self.dirname, fname) + fout = open(outfile, "w") + try: + doc.writexml(fout, addindent=" ", newl="\n", encoding="UTF-8") + doc.unlink() + finally: + fout.close() + + def printTestCase(self, doc, test, caseresults, casename=None): + if casename is None: + casename = str(caseresults.test) + root = doc.documentElement + caseelt = doc.createElement("testcase") + caseelt.setAttribute("classname", str(test.test)) + caseelt.setAttribute("name", casename) + caseelt.setAttribute("time", str(caseresults.totalTime)) + root.appendChild(caseelt) + + # output any errors + if caseresults.getErrorCount() > 0: + for e in caseresults.errors: + self.printError("error", doc, caseelt, e) + if caseresults.getFailureCount() > 0: + for f in caseresults.failures: + self.printError("failure", doc, caseelt, f) + + def printError(self, eltType, doc, parent, err): + elt = doc.createElement(eltType) + if err.thrownException(): + if err.thrownException().getMessage(): + elt.setAttribute("message", err.thrownException().getMessage()) + + elt.setAttribute("type", err.thrownException().getClass().getName()) + elt.appendChild( doc.createTextNode(self.getFailureStack(err.thrownException())) ) + + parent.appendChild(elt) + + +def sendReport( collector, fname, recips, product='' ): + '''Emails the test report output to the comma-separated email addresses in "recips"''' + f = open(fname) + reportText = f.read() + f.close() + reciplist = recips.split(",") + if product is not None and len(product) > 0: + product = ' ' + product + + errorcnt = '' + errortxt = 'error' + if collector.totalErrors > 0: + if collector.totalErrors > 1: + errortxt = 'errors' + errorcnt = "%d %s" % (collector.totalErrors, errortxt) + else: + errorcnt = "Success" + + mesg = \ +'''\ +From: %s +To: %s +Subject: {DEV MODE} [%s] %s Test Run: %s + +%s +''' % ('nobody@meetup.com', ', '.join(reciplist), collector.host, product, errorcnt, reportText) + + server = smtplib.SMTP('mail.int.meetup.com') + server.sendmail('nobody@meetup.com', reciplist, mesg) + server.quit() + + diff --git a/src/jython/jyunit/run.py b/src/jython/jyunit/run.py new file mode 100644 index 0000000..b711d8e --- /dev/null +++ b/src/jython/jyunit/run.py @@ -0,0 +1,658 @@ +#********************************************************** +#* Runs all test code under test_src directory. +#* +#********************************************************** + +import os +import os.path +import sys +import getopt +import time + +from java.io import FileOutputStream, PrintStream, IOException +import java.lang +import java.util +import junit.framework +import junit.runner +import re +import jyunit.util as util +import jyunit.report as report +from jyunit.util import ResultCollector, SimpleResults + +# suppress automatic result printing from util +util.__suppress_output__ = True +# disable jyunit exception catching +#sys.excepthook = sys.__excepthook__ + +# global variable (boo, hiss!) +__debug_enabled = False + +# *********************************************** +# Adapters for different types of test cases +# +# Currently supports jython test scripts & JUnit 3 test cases. +# JUnit 4 test supported via JUnit4Adapter class +# *********************************************** + +class FileTest(junit.framework.TestCase): + ''' + This class hooks into the JUnit framework by representing + each jython test script as an individual test case. + ''' + def __init__(self, filename, listener=None): + # translate filename to importable module + tmpname = re.sub("\.py$", "", filename) + self.mod_name = tmpname.replace("/", ".") + self.setName(self.mod_name) + + if listener is not None: + self.listener = listener + + def createResult(self): + util.newResults(self, testlistener=self.listener) + return util.getResults() + + def setUp(self): + # setup a new test result instance + #util.newResults(self) + pass + + def tearDown(self): + # clear the test result instance + pass + + def countTestCases(self): + # dummy this up -- 1 file == 1 test + return 1 + + def modSetUp(self, mod): + if 'setup' in dir(mod): + mod.setup() + + def modTearDown(self, mod): + if 'teardown' in dir(mod): + mod.teardown() + + def run(self, results=None): + ''' + Runs the test by importing the jython script. So any top-level + code is executed. In addition, if the script contains a function + named "run_test()", that function is executed. + ''' + util.startTest(self, results, self.listener) + debug("Running test: %s" % (self.mod_name)) + + # inject the function handlers before running the code + testmod = None + try: + __import__(self.mod_name, globals(), locals()) + testmod = sys.modules[self.mod_name] + + # setup + try: + self.modSetUp(testmod) + except java.lang.Exception, je: + util.__test_results__.error(je) + except Exception, e: + util.__test_results__.error(e) + + # check for a run_test module function -- this allows test running + # to be disabled for normal imports + if 'run_test' in dir(testmod): + debug("Executing function: %s.run_test" % self.mod_name) + testmod.run_test() + + except java.lang.Exception, je: + util.__test_results__.error(je) + except Exception, e: + print >> sys.__stdout__, ("Exception %s" % (e)) + util.__test_results__.error(e) + + # teardown + if testmod is not None: + try: + self.modTearDown(testmod) + except java.lang.Exception, je: + util.__test_results__.error(je) + except Exception, e: + util.__test_results__.error(e) + + util.endTest(self) + + # two method versions, one returns results + if results is None: + # no arg version + return util.getResults() + + def toString(self): + ''' + By default JUnit displays "testname(class name)", which is just plain + ugly here, since the only useful information is the filename. So just return + that instead. + ''' + return self.mod_name + + def __str__(self): + return self.toString() + + +class JUnitTestAdapter(junit.framework.TestCase): + ''' + Wraps an existing JUnit TestCase instance, so that the test listener can be setup correctly + ''' + def __init__(self, basetest, listener=None): + junit.framework.TestCase.__init__(self) + self.listener = listener + self.basetest = basetest + + def createResult(self): + res = self.basetest.createResult() + if self.listener is not None: + res.removeListener(self.listener) + res.addListener(self.listener) + return res + #util.newResults(self, testlistener=self.listener) + #return util.getResults() + + def getTestClass(self): + if "getTestClass" in dir(self.basetest) and self.basetest.getTestClass() is not None: + return self.basetest.getTestClass() + + return None + + def getName(self): + if "getName" in dir(self.basetest): + return self.basetest.getName() + else: + return str(self.basetest) + + def countTestCases(self): + return self.basetest.countTestCases() + + def setUp(self): + self.basetest.setUp() + + def tearDown(self): + self.basetest.tearDown() + + def run(self, results=None): + ''' + Runs the underlying JUnit TestCase. First we make sure our + own test listener is registered with the JUnit results instance + so we can collect stats for our own reporting. + ''' + util.startTest(self, results, self.listener) + debug("Running test: %s" % (self.getName())) + + # inject the function handlers before running the code + try: + if results is not None: + results.removeListener(self.listener) + results.addListener(self.listener) + else: + results = self.createResult() + self.basetest.run(results) + except java.lang.AssertionError, ae: + util.__test_results__.fail(ae.getMessage()) + except java.lang.Exception, je: + util.__test_results__.error(je) + except Exception, e: + debug("Exception %s" % (e)) + util.__test_results__.error(e) + + util.endTest(self) + + # two method versions, one returns results + if results is None: + # no arg version + return util.getResults() + + def toString(self): + return self.getName() + + def __str__(self): + return self.toString() + + +class JUnitSuiteAdapter(junit.framework.TestSuite): + ''' + Our own version of a JUnit TestSuite, which allows us to track + what java class is actually being tested. + ''' + def __init__(self, testclass): + junit.framework.TestSuite.__init__(self, testclass) + self.testclass = testclass + + def getTestClass(self): + #if __debug_enabled: print >> sys.__stdout__, "returning test class %s " % (str(self.testclass)) + return self.testclass + + def countTestCases(self): + ''' + JUnit annoyingly creates a "warning" test case if there are no actual + tests to run. So here we double check and ignore that if we cannot find any + test* methods in the class. + ''' + return sum( [1 for x in dir(self.testclass) if x.startswith("test")] ) + + + +# *********************************************** +# Classes for loading test cases from a directory tree +# +# *********************************************** + +# +# Patterns for matching "testable" filenames +# +TEST_FILE_REGEX = re.compile("^test.*\.py$") +JAVA_FILE_REGEX = re.compile("^.*\.java$") +HIDDEN_REGEX = re.compile("^\..*?") +TEST_UTILS_FILE = os.path.basename(util.__file__) + + +class JyTestLoader(object): + ''' + Loads jython test scripts from the given directory, matching the naming convention "testxxx.py". + Each script is run as a separate "test case". + ''' + def __init__(self, basedir, collector=None, allFiles=False): + self.basedir = basedir + self.allFiles = allFiles + debug("Base dir is %s" % (self.basedir)) + + # initialize test cases to an empty list + self.test_cases = [] + + if collector is not None: + self.collector = collector + + def loadAll(self): + ''' + Walks the entire test directory tree, and loads + all files found as test cases. + ''' + debug("Loading tests from %s" % self.basedir) + + os.path.walk(self.basedir, self.loadEntry, None) + + if __debug_enabled: + print >> sys.__stdout__, ("Found %d test cases" % len(self.test_cases)) + for t in self.test_cases: + print >> sys.__stdout__, ("\t%s" % t.getName()) + print >> sys.__stdout__, "" + + + def loadList(self, filelist): + ''' + Loads test files from the given list. A base directory + must still be provided in the constructor so we can + resolve the correct relative name. + ''' + for file in filelist: + dir, f = os.path.split(file) + dir = os.path.abspath(dir) + self.loadFile(dir, f) + + def loadEntry(self, arg, dirname, filelist): + ''' + Attempts to create an individual test case from a script file, + appending it to the internal list of test cases. + ''' + #print "Dir %s" % (dirname) + #print "Files %s" % (str(filelist)) + # skip hidden files and dirs + if HIDDEN_REGEX.match(dirname): + return + + # remove hidden files and dirs for dirlist + for f in filelist: + if HIDDEN_REGEX.match(f): + filelist.remove(f) + + for f in filelist: + #print "\t%s" % (f) + self.loadFile(dirname, f) + + def loadFile(self, dirname, f): + # skip util + if TEST_UTILS_FILE == f: + return + + if self.isTestFile(dirname, f): + testcase = self.createTestCase(dirname, f) + # skip if there are no tests to run + if testcase is not None: + if testcase.countTestCases() > 0: + self.test_cases.append( testcase ) + elif __debug_enabled: + print >> sys.__stdout__, ("No tests to run, skipping...") + + def isTestFile(self, dirname, filename): + ''' + Determines is a given file constitutes a test. This allows easy extending + of JyTestLoader by implementing this and createTestCase() + ''' + return (self.allFiles or TEST_FILE_REGEX.match(filename)) + + def createTestCase(self, dirname, filename): + ''' + Creates a TestCase instance based on the given file + ''' + # get the relative dir to the file + reldir = re.sub("^"+self.basedir+"/?", '', dirname) + return FileTest( os.path.join(reldir, filename), self.collector ) + + def getTestSuite(self): + ''' + Returns a JUnit junit.framework.TestSuite instance containing all our + internal test cases. + ''' + suite = junit.framework.TestSuite() + for t in self.test_cases: + suite.addTest(t) + + return suite + + +class JavaTestLoader(JyTestLoader): + ''' + Loads any Java JUnit test cases from the given directory. + ''' + def __init__(self, basedir, collector=None): + JyTestLoader.__init__(self, basedir, collector) + + def isTestFile(self, dirname, filename): + debug("Testing dir: %s, file: %s" % (dirname, filename)) + if JAVA_FILE_REGEX.match(filename): + classFile = self.getClassFile(dirname, filename) + clazz = self.getClass(classFile) + if clazz is not None: + debug("Class file: %s, Class: %s" % (classFile, str(clazz))) + if junit.framework.Test.isAssignableFrom(clazz) or 'suite' in dir(clazz): + return True + + #print >> sys.__stdout__, ("Not assignable") + return False + + + def createTestCase(self, dirname, filename): + classFile = self.getClassFile(dirname, filename) + clazz = self.getClass(classFile) + return makeJUnitTest(clazz, self.collector) + + + def getClassFile(self, dirname, filename): + # get the relative dir to the file + reldir = re.sub("^"+self.basedir+"/?", '', dirname) + return os.path.join(reldir, filename) + + def getClass(self, fullFilename): + # remove any file extension + cname = re.sub('\.java$', '', fullFilename) + cname = re.sub('/', '.', cname) + try: + return java.lang.Class.forName(cname) + except java.lang.Exception, e: + debug("Error loading test class: %s" % e.getMessage()) + e.printStackTrace(java.lang.System.err) + return None + + +class TestRunner(object): + ''' + Loads test cases and collects the results + + NOTE: This is unused for the moment. Instead we use JUnit to run the tests. + ''' + def __init__(self, collector=None): + # extract this module's base dir + testsmod = util + + # initialize test cases to an empty list + self.test_cases = [] + + if collector is not None: + self.collector = collector + + def addSuite(self, testsuite): + self.addAll( testsuite.tests() ) + + def addAll(self, tests): + for t in tests: + self.test_cases.append(t) + + def runAll(self): + ''' + Convenience function to directly run test cases. This is currently for debugging + purposes, as the actual test suites are normally run through the JUnit TestRunner + instance. + ''' + for t in self.test_cases: + # run each test + t.setUp() + results = t.run() + + self.collector.printAll() + + +def debug(mesg): + ''' + Log debugging information, if enabled + ''' + if __debug_enabled: print >> sys.__stdout__, (mesg) + +def makeJUnitTest(clazz, collector): + junittest = None + if clazz is not None: + if junit.framework.Test.isAssignableFrom(clazz): + junittest = JUnitSuiteAdapter(clazz) + elif 'suite' in dir(clazz): + junittest = clazz.suite() + + if junittest is not None: + return JUnitTestAdapter( junittest, collector ) + + return None + + +# *********************************************** +# Main execution +# +# *********************************************** + +def getOptions(): + # get any passed in options + try: + optlist, args = getopt.getopt( sys.argv[1:], "hvo:r:d:p:j:t:x:f:", \ + ["help", "verbose", "output=", "recipients=", "directory=", "product=", "javasrc=", "testclass=", "xmldir=", "file=", "debug"] ) + except getopt.GetoptError: + usage() + sys.exit(2) + + # store options in a simple object + opts = dict() + opts['verbose'] = False + opts['product'] = '' + for o, v in optlist: + if o in ("-h", "--help"): + usage() + sys.exit() + elif o in ("-v", "--verbose"): + opts['verbose'] = True + elif o in ("-o", "--output"): + opts['output'] = v + elif o in ("-r", "--recipients"): + opts['recipients'] = v + elif o in ("-d", "--directory"): + opts['directory'] = v + elif o in ("-p", "--product"): + opts['product'] = v + elif o in ("-j", "--javasrc"): + opts['javasrc'] = v + elif o in ("-t", "--testclass"): + opts['testclass'] = v + elif o in ("-x", "--xmldir"): + opts['xmldir'] = v + elif o in ("-f", "--file"): + opts['inputfile'] = v + elif o == "--debug": + global __debug_enabled + __debug_enabled = True + util.__debug_enabled = True + + return (opts, args) + +def usage(): + print ''' +Usage:\t%s [options] [testfile1 testfile2 ... ] + + -h, --help Show this usage information + -o, --output=filename Write test results to this file + -r, --recipients=emails Email the test results to these recipients (comma separated) + -d, --directory=path Use this as the test src directory + -j, --javasrc=path Source directory for Java JUnit test cases + -t, --testclass=clsname To load and run an individual JUnit test case + -f, --file=filename Read list of test files from filename + -p, --product=name Product name ("meetup", "alliance"), only used for output + -x, --xmldir=dirname Output XML test results to directory (format same as Ant JUnit task XML formatter) + -v, --verbose Show extra information as tests are running + --debug Show debug messages for test running''' % (sys.argv[0]) + + +def createOutput( outfile ): + try: + fout = FileOutputStream( outfile ) + return PrintStream( fout ) + except IOException: + print "Error opening output file" + return None + + +def main(opts, args): + ''' + Main execution when the run script is called directly + ''' + runResults = ResultCollector() + alltests = junit.framework.TestSuite() + + if 'testclass' in opts: + # single test class + try: + clazz = java.lang.Class.forName(opts['testclass']) + alltests.addTest(makeJUnitTest(clazz, runResults)) + except java.lang.Exception, e: + print >> sys.__stdout__, ("Error loading test class: %s" % e.getMessage()) + e.printStackTrace(java.lang.System.err) + sys.exit(2) + else: + # read in list of tests from input file, if specified + filelist = None + if 'inputfile' in opts: + fin = None + filelist = [] + try: + try: + if opts['inputfile'] == '-': + # using stdin + fin = sys.stdin + else: + fin = open(opts['inputfile']) + + filelist = [l.strip() for l in fin] + finally: + if fin is not None: fin.close() + except IOError, ioe: + print >> sys.__stdout__, ("Error reading input file %s" % opts['inputfile']) + print >> sys.__stderr__, str(ioe) + sys.exit(3) + + debug("Input file list is:\n%s\n" % ('\n'.join(filelist))) + + # add the specified directory to the python path + if 'directory' in opts: + d = os.path.abspath( opts['directory'] ) + if d not in sys.path: + sys.path.append(d) + #print "PYTHONPATH is %s" % (sys.path) + + loader = JyTestLoader(d, runResults) + # if files listed, use those + if filelist is not None: + # override test filename matching -- since the files are specified + loader.allFiles = True + pyfiles = [f for f in filelist if f.endswith(".py")] + loader.loadList(pyfiles) + else: + # otherwise walk the directory + loader.loadAll() + + for x in loader.getTestSuite().tests(): + alltests.addTest(x) + + # load any Java JUnit tests + javaloader = None + if opts.has_key('javasrc'): + javaloader = JavaTestLoader(opts['javasrc'], runResults) + # use explicit file list, if specified + if filelist is not None: + javafiles = [f for f in filelist if f.endswith(".java")] + javaloader.loadList(javafiles) + else: + javaloader.loadAll() + + if __debug_enabled: print >> sys.__stdout__, ("Found %d java tests" % (len(javaloader.test_cases))) + + for x in javaloader.getTestSuite().tests(): + alltests.addTest(x) + + # direct output to file if specified + out = java.lang.System.err + if opts.has_key('output'): + out = createOutput( opts['output'] ) + + # redirect stdout to temp file for test modules + testout = None + if not opts['verbose']: + testout = open( time.strftime('/tmp/tester_stdout.log'), 'w') + sys.stdout = testout + + + # run with JUnit + runner = None + runner = junit.textui.TestRunner() + + runner.doRun( alltests ) + + printer = report.TextPrinter(runResults, out) + printer.printAll() + print >> sys.__stdout__, ("ERROR COUNT: %d" % runResults.totalErrors) + + # output XML results if requested + if 'xmldir' in opts: + try: + dirname = os.path.abspath(opts['xmldir']) + xmlout = report.XMLPrinter(runResults, dirname) + xmlout.printAll() + except IOError, ioe: + print >> sys.__stderr__, ("Unable to output XML results: %s" % str(ioe)) + + if testout is not None: + testout.flush() + testout.close() + + # send out an email notice + if opts.has_key("recipients") and opts.has_key("output"): + report.sendReport( runResults, opts['output'], opts['recipients'], opts['product'] ) + + +if __name__ == '__main__': + # first load any options + (opts, args) = getOptions() + + # check for at least one of the required sources + if not ('directory' in opts or 'javasrc' in opts or 'testclass' in opts) and len(args) == 0: + usage() + sys.exit(2) + + main(opts, args) diff --git a/src/jython/jyunit/util.py b/src/jython/jyunit/util.py new file mode 100644 index 0000000..ec857f0 --- /dev/null +++ b/src/jython/jyunit/util.py @@ -0,0 +1,485 @@ +from java.util import HashMap +from java.util import ArrayList +from junit.framework import TestCase, AssertionFailedError, ComparisonFailure, TestResult +import junit +from types import MethodType, FunctionType +import traceback +import java.lang +import re +import sys +import time +import difflib + +from jyunit import SimpleResults, MultiResults, JythonAssertionFailedError, JythonComparisonFailure, JythonException, getTestInfo +from jyunit.report import TextPrinter + +exportedNames = ['makemap', 'FunctionHandler', 'DummyTest', 'AssertHandler', 'printTestResults'] + + +def makemap(**vals): + map = HashMap() + for k,v in vals.items(): + print "setting %s : %s" % ( k,v ) + map[k] = v + return map + +def makelist(*vals): + list = ArrayList() + for v in vals: + list.add(v) + return list + + +class FunctionHandler(object): + def __init__(self, test_case, func): + self.test_case = test_case + self.func = func + + def __call__(self, *args): + #print "Calling %s" % (self.func) + apply(self.func, (self.test_case,) + args) + +# ******************* JUnit integration/emulation ********************* + +class SimpleTest(TestCase): + '''Dummy TestCase implementation to provide easy integration + into JUnit reporting and running.''' + def __init__(self, name=None): + if name is None: + name = __name__ + self.setName(name) + + def createResult(self): + test_utils.newResults(self) + return test_utils.getResults() + + def run(self): + '''There is nothing to run here. The tests conditions + are actually invoked in the opposite direction using the + assert*() and fail*() functions. This class just fills + in compatibility with the Junit framework.''' + pass + + + def toString(self): + '''By default JUnit displays "testname(class name)", which is just plain + ugly here, since the only useful information is the filename. So just return + that instead.''' + return self.name + + def __str__(self): + return self.toString() + +# *************** Assertion handler ***************************** + +class AssertHandler(object): + '''Simply implementation of JUnit Assert methods + to allow these to be called directly by importing + code.''' + + def __init__(self, test_case, results=None, listener=None): + self.test_case = test_case + if results is None: + results = TestResult() + self.test_results = results + + self.listener = None + self.setListener( listener ) + + def setListener(self, listener): + if listener is not None: + self.listener = listener + # try clearing the listener if already registered + self.test_results.removeListener(listener) + self.test_results.addListener(listener) + + def assertEquals(self, actual, expected, mesg=None, delta=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if delta is not None: + # check if these are both numbers and we have an acceptable buffer window + # for floating point inaccuracies + if abs(actual - expected) > delta: + if mesg is None: + mesg = "Values are not equal (within allowed delta)" + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(expected), str(actual))) + + elif expected != actual: + if mesg is None: + mesg = "Values are not equal" + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(expected), str(actual))) + + def assertNotEquals(self, actual, expected, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if expected == actual: + if mesg is None: + mesg = "Values are equal, but expected not" + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(expected), str(actual))) + + def assertMoreThan(self, actual, expected, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if not actual > expected: + if mesg is None: + mesg = "Value is less than expected " + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(expected), str(actual))) + + def assertLessThan(self, actual, expected, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if not actual < expected : + if mesg is None: + mesg = "Value is more than expected " + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(expected), str(actual))) + + def assertFalse(self, condition, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if (condition): + if mesg is None: + mesg = "Expected false value, but condition was true" + self.test_results.addFailure(self.test_case, JythonAssertionFailedError(mesg)) + + def assertMatches(self, actual, pattern, mesg=None, flags=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + matchval = None + if flags is not None: + matchval = re.match(pattern, actual, flags) + else: + matchval = re.match(pattern, actual) + + if not matchval: + if mesg is None: + mesg = "Value doesn't match regex " + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(pattern), str(actual))) + + def assertNotMatches(self, actual, pattern, mesg=None, flags=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + matchval = None + if flags is not None: + matchval = re.match(pattern, actual, flags) + else: + matchval = re.match(pattern, actual) + + if matchval: + if mesg is None: + mesg = "Value matches regex, but shouldn't " + self.test_results.addFailure(self.test_case, JythonComparisonFailure(mesg, str(pattern), str(actual))) + + + def assertNotNull(self, obj, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if (obj is None): + if mesg is None: + mesg = "Object was null, but expected non-null" + self.test_results.addFailure(self.test_case, JythonAssertionFailedError(mesg)) + + def assertNull(self, obj, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if (obj is not None): + if mesg is None: + mesg = "Object was not null, but expected null reference" + self.test_results.addFailure(self.test_case, JythonAssertionFailedError(mesg)) + + def assertNotSame(self, actual, expected, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if (expected == actual): + if mesg is None: + "Expected references to different objects" + self.test_results.addFailure(self.test_case, JythonComparisonFailure("Object references should be different instances", str(expected), str(actual))) + + def assertTrue(self, condition, mesg=None): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if not condition: + if mesg is None: + mesg = "Expected condition to be true, but was false" + self.test_results.addFailure(self.test_case, JythonAssertionFailedError(mesg)) + + def assertExcept(self, func, targ, exc, *args): + if self.listener is not None: + self.listener.assertCalled( self.test_case ) + + if not callable(func): + self.fail("Function %s not callable" % str(func)) + return + + callargs = [targ] + if args is not None and len(args) > 0: + callargs[1:] = args; + + try: + apply(func, callargs) + self.fail("Expected %s calling %s" % (str(exc), str(func))) + except exc, e: + # expected condition + pass + except Exception, e: + self.fail("Unexpected exception %s calling %s" % (str(e), str(func))) + + + + def fail(self, mesg=None): + if mesg is None: + mesg = "Test failed" + self.test_results.addFailure(self.test_case, JythonAssertionFailedError(mesg)) + + def error(self, err): + if isinstance(err, java.lang.Throwable): + self.test_results.addError(self.test_case, err) + elif isinstance(err, Exception): + #traceback.print_exc() + if sys.exc_traceback is not None: + tb = traceback.extract_tb( sys.exc_traceback ) + else: + tb = traceback.extract_stack() + self.test_results.addError(self.test_case, JythonException(sys.exc_type, err, tb)) + else: + self.fail('Error: %s' % (str(err))) + + def getResults(self): + return self.test_results + + def printResults(self, cnt=None): + if cnt is not None: + print "%d)" % (cnt) + if (self.test_results.failureCount() + self.test_results.errorCount()) == 0: + print "Test %s: SUCCESS!" % (self.test_case.getName()) + else: + print "Test %s: FAILED!\n" % self.test_case.getName() + + # show test failures + print "%d failures" % self.test_results.failureCount() + failEnum = self.test_results.failures() + cnt = 1 + while (failEnum.hasMoreElements()): + nextFail = failEnum.nextElement() + print "Failure %d: %s" % (cnt, nextFail.exceptionMessage()) + print nextFail.trace() + cnt = cnt + 1 + + print "" + + # show test errors + print "%d errors" % self.test_results.errorCount() + failEnum = self.test_results.errors() + cnt = 1 + while (failEnum.hasMoreElements()): + nextFail = failEnum.nextElement() + print "Error %d:\t%s" % (cnt, nextFail.exceptionMessage()) + print nextFail.trace() + cnt = cnt + 1 + + print "" + print "Test summary: %d failure, %d errors" % (self.test_results.failureCount(), self.test_results.errorCount()) + + +def exportAssertions(handler): + # Expose the assertion method for importing code + for f in handler.__class__.__dict__.keys(): + fref = handler.__class__.__dict__[f] + #print "Checking %s, type %s" % (f, type(fref)) + if (type(fref) is MethodType or type(fref) is FunctionType) and (f.startswith("assert") or f.startswith("fail")): + #print "Setting %s" % f + globals()[f] = FunctionHandler(handler, handler.__class__.__dict__[f]) + exportedNames.append(f) + +# print out the test results +def printTestResults(onexit=False): + global __test_results__ + global __suppress_output__ + # do nothing for auto call when suppressed + if onexit and __suppress_output__: + return + + # print out all queued results + TextPrinter(__completed_results__).printAll() + + +def failOnException(exc_type, exc_value, exc_trace): + global __test_results__ + global __debug__ + + # always bail in debug mode + if __debug_enabled: + sys.__excepthook__(exc_type, exc_value, exc_trace) + elif isinstance(exc_value, java.lang.Error) and not isinstance(exc_value, java.lang.AssertionError): + # bail on java errors + sys.__excepthook__(exc_type, exc_value, exc_trace) + elif isinstance(exc_value, Exception): + sys.__excepthook__(exc_type, exc_value, exc_trace) + + __test_results__.error(exc_value) + +def newResults(testcase, testresults=None, testlistener=None): + global __test_results__ + global __completed_results__ + if testlistener is None: + testlistener = __completed_results__ + + __test_results__ = AssertHandler(testcase, testresults, testlistener) + + # add the current results to the completed queue + #__completed_results__.append(__test_results__) + exportAssertions(__test_results__) + +def startTest(testcase, testresults=None, testlistener=None): + #newResults(testcase, testresults, testlistener) + global __test_results__ + __test_results__.test_case = testcase + __test_results__.setListener( testlistener ) + getResults().startTest(testcase) + +def endTest(testcase): + getResults().endTest(testcase) + +def getResults(): + global __test_results__; + return __test_results__.getResults() + + +def runTest(testcase): + # initialize new results + #newResults(testcase) + startTest(testcase) + try: + testcase.run() + except java.lang.Exception, je: + __test_results__.error(je) + except Exception, e: + __test_results__.error(e) + endTest(testcase) + + + +class ResultCollector(junit.framework.TestListener): + '''Stores test results indexed by tests, so we can easily retrieve stats. + This is used the by jyunit.run module to collect test results over a batch test run, + so that results can be printed at the completion of the test run. + + Internally, a SimpleResults instance is stored for each test case. + ''' + def __init__(self): + self.results = dict() + self.totalErrors = 0 + # overall test run time + self.runStart = None + self.runStop = None + + # collect some basic info on the run environment + import socket + self.host = socket.gethostname() + + + def startTest(self, test): + if self.runStart is None: + self.runStart = time.time() + self.getResults(test).start(test) + + def endTest(self, test): + self.runStop = time.time() + self.getResults(test).end(test) + + def addFailure(self, test, failedError): + r = self.getResults(test) + r.addFailure(test, junit.framework.TestFailure(test, failedError) ) + self.totalErrors = self.totalErrors + 1 + + def addError(self, test, exception): + # treat assertions as failures, not errors + if isinstance(exception, java.lang.AssertionError): + self.addFailure(test, exception) + return + + r = self.getResults(test) + r.addError(test, junit.framework.TestFailure(test, exception) ) + self.totalErrors = self.totalErrors + 1 + + def assertCalled(self, test): + self.getResults(test).asserted(test) + + def getTestName(self, test): + (cls, name) = getTestInfo(test) + return name + + def getResults(self, test): + (tcls, tname) = getTestInfo(test) + key = tcls + if tcls is None: + key = tname + res = None + if not self.results.has_key(key): + if tcls is not None: + self.results[key] = MultiResults(test) + else: + self.results[key] = SimpleResults(test) + + return self.results[key] + + def splitResults(self): + passed = [] + failed = [] + + for (k, v) in self.results.items(): + if v.passed(): + passed.append(v) + else: + failed.append(v) + + if len(passed) > 0: + passed.sort( lambda x,y: cmp(str(x.test), str(y.test)) ) + if len(failed) > 0: + failed.sort( lambda x,y: cmp(str(x.test), str(y.test)) ) + return (passed, failed) + + def runPassed(self): + (p, f) = self.splitResults() + return len(f) == 0 + + def totalRunCount(self): + return len(self.results) + + def totalRunTime(self): + if self.runStop is not None and self.runStart is not None: + return self.runStop - self.runStart + + return 0 + + + +# ****************************************************************** +# Setup for reporting on exit when directly running a test script +# ****************************************************************** + +# store completed tests in a list until program exit +__completed_results__ = ResultCollector() + +# provide access to junit asset* methods for direct execution of files +newResults(SimpleTest(sys.argv[0]), testlistener=__completed_results__) + +__suppress_output__ = False +__debug_enabled = False + +# Add a global exception handling hook to report uncaught exceptions as errors +sys.excepthook = failOnException + +# Register an exit function to print out the results when a script is run directly +import atexit +atexit.register(printTestResults, True)