1 |
# Copyright (c) 2002, 2003, 2004, 2005 by Intevation GmbH |
2 |
# Authors: |
3 |
# Bernhard Herzog <[email protected]> |
4 |
# |
5 |
# This program is free software under the GPL (>=v2) |
6 |
# Read the file COPYING coming with Thuban for details. |
7 |
|
8 |
""" |
9 |
Support classes and function for the test suite |
10 |
""" |
11 |
|
12 |
__version__ = "$Revision$" |
13 |
# $Source$ |
14 |
# $Id$ |
15 |
|
16 |
import os, sys |
17 |
import unittest |
18 |
import traceback |
19 |
|
20 |
import postgissupport |
21 |
|
22 |
|
23 |
def thuban_dir(): |
24 |
"""Return the directory containing the Thuban package""" |
25 |
thisdir = os.path.dirname(__file__) |
26 |
return os.path.join(thisdir, os.pardir) |
27 |
|
28 |
def resource_dir(): |
29 |
return os.path.join(thuban_dir(), "Resources") |
30 |
|
31 |
def add_thuban_dir_to_path(): |
32 |
"""Insert the Thuban directory at the beginning of the python path. |
33 |
|
34 |
If it's already part of the path, remove later occurrences. |
35 |
""" |
36 |
dir = thuban_dir() |
37 |
while 1: |
38 |
try: |
39 |
sys.path.remove(dir) |
40 |
except ValueError: |
41 |
break |
42 |
sys.path.insert(0, dir) |
43 |
|
44 |
|
45 |
_initthuban_done = 0 |
46 |
def initthuban(): |
47 |
"""Initialize the interpreter for using Thuban modules |
48 |
""" |
49 |
global _initthuban_done |
50 |
if not _initthuban_done: |
51 |
# Thuban uses gettext to translate some strings. Some of these |
52 |
# strings are tested for equality in some test cases. So we |
53 |
# unset any LANG environment setting to make sure only the |
54 |
# untranslated messages are used. |
55 |
try: |
56 |
del os.environ["LANG"] |
57 |
except KeyError: |
58 |
pass |
59 |
add_thuban_dir_to_path() |
60 |
import thubaninit |
61 |
|
62 |
# Install a dummy translation function so that importing |
63 |
# Thuban.UI doesn't install a wx specific one for which would |
64 |
# need to import wxPython |
65 |
import Thuban |
66 |
Thuban.install_translation_function(lambda s: s) |
67 |
|
68 |
# For the time being the default encoding in the test suite is |
69 |
# latin 1. This is mostly for historical reasons. Other |
70 |
# encodings can be specified as an argument for runtests.py. |
71 |
Thuban.set_internal_encoding("latin-1") |
72 |
|
73 |
_initthuban_done = 1 |
74 |
|
75 |
|
76 |
# |
77 |
# Special test runner and result that support skipping tests |
78 |
# |
79 |
|
80 |
class SkipTest(Exception): |
81 |
"""Exception to raise in tests that are skipped for some reason |
82 |
|
83 |
For instance, since gdal support is optional, test cases that |
84 |
require gdal raise this exception to indicate that they are skipped. |
85 |
Skipped is different from failure or error in that it is expected |
86 |
under certain circumstances. |
87 |
""" |
88 |
|
89 |
class ThubanTestResult(unittest._TextTestResult): |
90 |
|
91 |
def __init__(self, stream, descriptions, verbosity): |
92 |
unittest._TextTestResult.__init__(self, stream, descriptions, |
93 |
verbosity) |
94 |
self.skipped_tests = {} |
95 |
|
96 |
def add_skipped_test(self, test, exc): |
97 |
reason = str(exc) |
98 |
self.skipped_tests.setdefault(reason, []).append(test) |
99 |
|
100 |
def count_skipped(self): |
101 |
sum = 0 |
102 |
for tests in self.skipped_tests.values(): |
103 |
sum += len(tests) |
104 |
return sum |
105 |
|
106 |
def addError(self, test, err): |
107 |
"""Extend inherited method to handle SkipTest exceptions specially |
108 |
""" |
109 |
#print "addError", test, err |
110 |
if isinstance(err[1], SkipTest): |
111 |
self.add_skipped_test(test, err[1]) |
112 |
if self.showAll: |
113 |
self.stream.writeln("skipped") |
114 |
elif self.dots: |
115 |
self.stream.write('S') |
116 |
else: |
117 |
unittest._TextTestResult.addError(self, test, err) |
118 |
|
119 |
def printErrors(self): |
120 |
if self.skipped_tests: |
121 |
if self.dots or self.showAll: |
122 |
self.stream.writeln() |
123 |
self.stream.writeln("Skipped tests:") |
124 |
for reason, tests in self.skipped_tests.items(): |
125 |
self.stream.writeln(" %s:" % reason) |
126 |
for test in tests: |
127 |
self.stream.writeln(" " + test.id()) |
128 |
unittest._TextTestResult.printErrors(self) |
129 |
|
130 |
def getDescription(self, test): |
131 |
return test.id() |
132 |
|
133 |
|
134 |
class ThubanTestRunner(unittest.TextTestRunner): |
135 |
|
136 |
def _makeResult(self): |
137 |
return ThubanTestResult(self.stream, self.descriptions, self.verbosity) |
138 |
|
139 |
def run(self, test): |
140 |
result = unittest.TextTestRunner.run(self, test) |
141 |
self.stream.writeln("skipped = %d" % result.count_skipped()) |
142 |
return result |
143 |
|
144 |
|
145 |
class ThubanTestProgram(unittest.TestProgram): |
146 |
|
147 |
def runTests(self): |
148 |
"""Extend inherited method so that we use a ThubanTestRunner""" |
149 |
self.testRunner = ThubanTestRunner(verbosity = self.verbosity) |
150 |
unittest.TestProgram.runTests(self) |
151 |
|
152 |
|
153 |
def execute_as_testsuite(callable, *args, **kw): |
154 |
"""Call callable with args as if it were the entry point to the test suite |
155 |
|
156 |
Return watever callable returns. |
157 |
|
158 |
This is a helper function for run_tests and runtests.py. Call |
159 |
callable in a try-finally block and run some cleanup and print some |
160 |
additional information in the finally block. |
161 |
|
162 |
The additionaly information include: |
163 |
|
164 |
- A list of uncollected objects (after an explicit garbage |
165 |
collector call) |
166 |
|
167 |
- any unsubscribed messages |
168 |
""" |
169 |
try: |
170 |
return callable(*args, **kw) |
171 |
finally: |
172 |
# This has to be in a finally clause because unittest.main() |
173 |
# ends with a sys.exit to make sure that the process exits with |
174 |
# an appropriate exit code |
175 |
|
176 |
# Shutdown the postgis server if it's running |
177 |
try: |
178 |
postgissupport.shutdown_test_server() |
179 |
except: |
180 |
traceback.print_exc() |
181 |
|
182 |
# Print additional information |
183 |
print_additional_summary() |
184 |
|
185 |
def run_tests(): |
186 |
"""Frontend for unittest.main that prints some additional debug information |
187 |
|
188 |
After calling unittest.main, run the garbage collector and print |
189 |
uncollected objects. Also print any un-unsubscribed messages. |
190 |
""" |
191 |
execute_as_testsuite(ThubanTestProgram) |
192 |
|
193 |
|
194 |
def print_additional_summary(): |
195 |
"""Print some additional summary information after tests have been run""" |
196 |
print_garbage_information() |
197 |
import xmlsupport |
198 |
xmlsupport.print_summary_message() |
199 |
|
200 |
def print_garbage_information(): |
201 |
"""Print information about things that haven't been cleaned up. |
202 |
|
203 |
Run the garbage collector and print uncollected objects. Also print |
204 |
any un-unsubscribed messages. |
205 |
""" |
206 |
# this function may be called indirectly from test cases that test |
207 |
# test support modules which do not use anything from thuban itself, |
208 |
# so we call initthuban so that we can import the connector module |
209 |
initthuban() |
210 |
import gc, Thuban.Lib.connector |
211 |
gc.collect() |
212 |
if gc.garbage: |
213 |
print |
214 |
print "There are %d uncollected objects:" % len(gc.garbage) |
215 |
print gc.garbage |
216 |
Thuban.Lib.connector._the_connector.print_connections() |
217 |
|
218 |
# |
219 |
|
220 |
def create_temp_dir(): |
221 |
"""Create a temporary directory and return its name. |
222 |
|
223 |
The temporary directory is always called temp and is created in the |
224 |
directory where support module is located. |
225 |
|
226 |
If the temp directory already exists, just return the name. |
227 |
""" |
228 |
name = os.path.abspath(os.path.join(os.path.dirname(__file__), "temp")) |
229 |
|
230 |
# if the directory already exists, we're done |
231 |
if os.path.isdir(name): |
232 |
return name |
233 |
|
234 |
# create the directory |
235 |
os.mkdir(name) |
236 |
return name |
237 |
|
238 |
|
239 |
class FileTestMixin: |
240 |
|
241 |
"""Mixin class for tests that use files in the temporary directory |
242 |
""" |
243 |
|
244 |
def temp_file_name(self, basename): |
245 |
"""Return the full name of the file named basename in the temp. dir""" |
246 |
return os.path.join(create_temp_dir(), basename) |
247 |
|
248 |
def temp_dir(self): |
249 |
"""Return the name of the directory for the temporary files""" |
250 |
return create_temp_dir() |
251 |
|
252 |
|
253 |
|
254 |
class FileLoadTestCase(unittest.TestCase, FileTestMixin): |
255 |
|
256 |
"""Base class for test case that test loading files. |
257 |
|
258 |
This base class provides some common infrastructure for testing the |
259 |
reading of files. |
260 |
|
261 |
Each test case should be its own class derived from this one. There |
262 |
is one file associated with each class. The contents are defined by |
263 |
the file_contents class attribute and its name by the filename |
264 |
method. |
265 |
|
266 |
Derived classes usually only have to provide appropriate values for |
267 |
the file_contents and file_extension class attributes. |
268 |
""" |
269 |
|
270 |
file_contents = None |
271 |
file_extension = "" |
272 |
|
273 |
def filename(self): |
274 |
"""Return the name of the test file to use. |
275 |
|
276 |
The default implementation simply calls self.temp_file_name with |
277 |
a basename derived from the class name by stripping off a |
278 |
leading 'test_' and appending self.file_extension. |
279 |
""" |
280 |
name = self.__class__.__name__ |
281 |
if name.startswith("test_"): |
282 |
name = name[5:] |
283 |
return self.temp_file_name(name + self.file_extension) |
284 |
|
285 |
def setUp(self): |
286 |
"""Create the volatile file for the test. |
287 |
|
288 |
Write self.contents (which should be a string) to the file named |
289 |
by self.filename(). |
290 |
""" |
291 |
filename = self.filename() |
292 |
file = open(filename, "w") |
293 |
file.write(self.file_contents) |
294 |
file.close() |
295 |
|
296 |
|
297 |
class FloatComparisonMixin: |
298 |
|
299 |
""" |
300 |
Mixin class for tests comparing floating point numbers. |
301 |
|
302 |
This class provides a few methods for testing floating point |
303 |
operations. |
304 |
""" |
305 |
|
306 |
fp_epsilon = 1e-6 |
307 |
fp_inf = float('1e1000') # FIXME: hack for infinite |
308 |
|
309 |
def assertFloatEqual(self, test, value, epsilon = None): |
310 |
"""Assert equality of test and value with some tolerance. |
311 |
|
312 |
Assert that the absolute difference between test and value is |
313 |
less than self.fp_epsilon. |
314 |
""" |
315 |
if epsilon is None: |
316 |
epsilon = self.fp_epsilon |
317 |
if abs(test) == self.fp_inf: |
318 |
self.assertEqual(test, value) |
319 |
else: |
320 |
self.assert_(epsilon > abs(test - value), |
321 |
"abs(%g - %g) >= %g" % (test, value, epsilon)) |
322 |
|
323 |
def assertFloatSeqEqual(self, test, value, epsilon = None): |
324 |
"""Assert equality of the sequences test and value with some tolerance. |
325 |
|
326 |
Assert that the absolute difference between each corresponding |
327 |
value in test and value is less than the optional parameter |
328 |
epsilon. If epsilon is not given use self.fp_epsilon. |
329 |
""" |
330 |
self.assertEquals(len(test), len(value)) |
331 |
for i in range(len(test)): |
332 |
self.assertFloatEqual(test[i], value[i], epsilon) |
333 |
|
334 |
def assertPointListEquals(self, test, value): |
335 |
"""Assert equality of two lists of lists of tuples of float |
336 |
|
337 |
This assertion is usually used to compare the geometry of shapes |
338 |
as returned by a Shape object's Points() method, hence the name. |
339 |
""" |
340 |
for i in range(len(test)): |
341 |
self.assertEquals(len(test[i]), len(value[i])) |
342 |
for j in range(len(test[i])): |
343 |
self.assertFloatSeqEqual(test[i][j], value[i][j]) |
344 |
|
345 |
|
346 |
class SubscriberMixin: |
347 |
|
348 |
"""Mixin class for tests for messages sent through the Connector |
349 |
|
350 |
The SubscriberMixin has some methods that can be used as subscribers |
351 |
of events that when called append information about the message into |
352 |
a list of messages received. |
353 |
|
354 |
A derived class should call the clear_messages() method in both its |
355 |
setUp and tearDown methods to clear the list of messages received. |
356 |
""" |
357 |
|
358 |
def clear_messages(self): |
359 |
"""Clear the list of received messages. |
360 |
|
361 |
Call this at least in the tests setUp and tearDown methods. It's |
362 |
important to do it in tearDown too because otherwise there may |
363 |
be cyclic references. |
364 |
""" |
365 |
self.received_messages = [] |
366 |
|
367 |
def subscribe_no_params(self): |
368 |
"""Method for subscriptions without parameters. |
369 |
|
370 |
Add an empty tuple to the list of received messages. |
371 |
""" |
372 |
self.received_messages.append(()) |
373 |
|
374 |
def subscribe_with_params(self, *args): |
375 |
"""Method for subscriptions with parameters. |
376 |
|
377 |
Append the tuple will all arguments to this function (except for |
378 |
the self argument) to the list of received messages. |
379 |
""" |
380 |
self.received_messages.append(args) |
381 |
|
382 |
def check_messages(self, messages): |
383 |
"""Check whether the messages received match the list messages""" |
384 |
self.assertEquals(messages, self.received_messages) |
385 |
|