1 |
# Copyright (c) 2002, 2003 by Intevation GmbH |
2 |
# Authors: |
3 |
# Bernhard Herzog <[email protected]> |
4 |
# |
5 |
# This program is free software under the GPL (>=v2) |
6 |
# Read the file COPYING coming with Thuban for details. |
7 |
|
8 |
""" |
9 |
Support classes and function for the test suite |
10 |
""" |
11 |
|
12 |
__version__ = "$Revision$" |
13 |
# $Source$ |
14 |
# $Id$ |
15 |
|
16 |
import os, sys |
17 |
import unittest |
18 |
import traceback |
19 |
|
20 |
import postgissupport |
21 |
|
22 |
|
23 |
def thuban_dir(): |
24 |
"""Return the directory containing the Thuban package""" |
25 |
thisdir = os.path.dirname(__file__) |
26 |
return os.path.join(thisdir, os.pardir) |
27 |
|
28 |
def resource_dir(): |
29 |
return os.path.join(thuban_dir(), "Resources") |
30 |
|
31 |
def add_thuban_dir_to_path(): |
32 |
"""Insert the Thuban directory at the beginning of the python path. |
33 |
|
34 |
If it's already part of the path, remove later occurrences. |
35 |
""" |
36 |
dir = thuban_dir() |
37 |
while 1: |
38 |
try: |
39 |
sys.path.remove(dir) |
40 |
except ValueError: |
41 |
break |
42 |
sys.path.insert(0, dir) |
43 |
|
44 |
|
45 |
_initthuban_done = 0 |
46 |
def initthuban(): |
47 |
"""Initialize the interpreter for using Thuban modules |
48 |
""" |
49 |
global _initthuban_done |
50 |
if not _initthuban_done: |
51 |
# Thuban uses gettext to translate some strings. Some of these |
52 |
# strings are tested for equality in some test cases. So we |
53 |
# unset any LANG environment setting to make sure only the |
54 |
# untranslated messages are used. |
55 |
try: |
56 |
del os.environ["LANG"] |
57 |
except KeyError: |
58 |
pass |
59 |
add_thuban_dir_to_path() |
60 |
import thubaninit |
61 |
|
62 |
# Install a dummy translation function so that importing |
63 |
# Thuban.UI doesn't install a wx specific one for which would |
64 |
# need to import wxPython |
65 |
import Thuban |
66 |
Thuban.install_translation_function(lambda s: s) |
67 |
|
68 |
_initthuban_done = 1 |
69 |
|
70 |
|
71 |
# |
72 |
# Special test runner and result that support skipping tests |
73 |
# |
74 |
|
75 |
class SkipTest(Exception): |
76 |
"""Exception to raise in tests that are skipped for some reason |
77 |
|
78 |
For instance, since gdal support is optional, test cases that |
79 |
require gdal raise this exception to indicate that they are skipped. |
80 |
Skipped is different from failure or error in that it is expected |
81 |
under certain circumstances. |
82 |
""" |
83 |
|
84 |
class ThubanTestResult(unittest._TextTestResult): |
85 |
|
86 |
def __init__(self, stream, descriptions, verbosity): |
87 |
unittest._TextTestResult.__init__(self, stream, descriptions, |
88 |
verbosity) |
89 |
self.skipped_tests = {} |
90 |
|
91 |
def add_skipped_test(self, test, exc): |
92 |
reason = str(exc) |
93 |
self.skipped_tests.setdefault(reason, []).append(test) |
94 |
|
95 |
def count_skipped(self): |
96 |
sum = 0 |
97 |
for tests in self.skipped_tests.values(): |
98 |
sum += len(tests) |
99 |
return sum |
100 |
|
101 |
def addError(self, test, err): |
102 |
"""Extend inherited method to handle SkipTest exceptions specially |
103 |
""" |
104 |
#print "addError", test, err |
105 |
if isinstance(err[1], SkipTest): |
106 |
self.add_skipped_test(test, err[1]) |
107 |
if self.showAll: |
108 |
self.stream.writeln("skipped") |
109 |
elif self.dots: |
110 |
self.stream.write('S') |
111 |
else: |
112 |
unittest._TextTestResult.addError(self, test, err) |
113 |
|
114 |
def printErrors(self): |
115 |
if self.skipped_tests: |
116 |
if self.dots or self.showAll: |
117 |
self.stream.writeln() |
118 |
self.stream.writeln("Skipped tests:") |
119 |
for reason, tests in self.skipped_tests.items(): |
120 |
self.stream.writeln(" %s:" % reason) |
121 |
for test in tests: |
122 |
self.stream.writeln(" " + test.id()) |
123 |
unittest._TextTestResult.printErrors(self) |
124 |
|
125 |
def getDescription(self, test): |
126 |
return test.id() |
127 |
|
128 |
|
129 |
class ThubanTestRunner(unittest.TextTestRunner): |
130 |
|
131 |
def _makeResult(self): |
132 |
return ThubanTestResult(self.stream, self.descriptions, self.verbosity) |
133 |
|
134 |
def run(self, test): |
135 |
result = unittest.TextTestRunner.run(self, test) |
136 |
self.stream.writeln("skipped = %d" % result.count_skipped()) |
137 |
return result |
138 |
|
139 |
|
140 |
class ThubanTestProgram(unittest.TestProgram): |
141 |
|
142 |
def runTests(self): |
143 |
"""Extend inherited method so that we use a ThubanTestRunner""" |
144 |
self.testRunner = ThubanTestRunner(verbosity = self.verbosity) |
145 |
unittest.TestProgram.runTests(self) |
146 |
|
147 |
|
148 |
def execute_as_testsuite(callable, *args, **kw): |
149 |
"""Call callable with args as if it were the entry point to the test suite |
150 |
|
151 |
Return watever callable returns. |
152 |
|
153 |
This is a helper function for run_tests and runtests.py. Call |
154 |
callable in a try-finally block and run some cleanup and print some |
155 |
additional information in the finally block. |
156 |
|
157 |
The additionaly information include: |
158 |
|
159 |
- A list of uncollected objects (after an explicit garbage |
160 |
collector call) |
161 |
|
162 |
- any unsubscribed messages |
163 |
""" |
164 |
try: |
165 |
return callable(*args, **kw) |
166 |
finally: |
167 |
# This has to be in a finally clause because unittest.main() |
168 |
# ends with a sys.exit to make sure that the process exits with |
169 |
# an appropriate exit code |
170 |
|
171 |
# Shutdown the postgis server if it's running |
172 |
try: |
173 |
postgissupport.shutdown_test_server() |
174 |
except: |
175 |
traceback.print_exc() |
176 |
|
177 |
# Print additional information |
178 |
print_additional_summary() |
179 |
|
180 |
def run_tests(): |
181 |
"""Frontend for unittest.main that prints some additional debug information |
182 |
|
183 |
After calling unittest.main, run the garbage collector and print |
184 |
uncollected objects. Also print any un-unsubscribed messages. |
185 |
""" |
186 |
execute_as_testsuite(ThubanTestProgram) |
187 |
|
188 |
|
189 |
def print_additional_summary(): |
190 |
"""Print some additional summary information after tests have been run""" |
191 |
print_garbage_information() |
192 |
import xmlsupport |
193 |
xmlsupport.print_summary_message() |
194 |
|
195 |
def print_garbage_information(): |
196 |
"""Print information about things that haven't been cleaned up. |
197 |
|
198 |
Run the garbage collector and print uncollected objects. Also print |
199 |
any un-unsubscribed messages. |
200 |
""" |
201 |
# this function may be called indirectly from test cases that test |
202 |
# test support modules which do not use anything from thuban itself, |
203 |
# so we call initthuban so that we can import the connector module |
204 |
initthuban() |
205 |
import gc, Thuban.Lib.connector |
206 |
gc.collect() |
207 |
if gc.garbage: |
208 |
print |
209 |
print "There are %d uncollected objects:" % len(gc.garbage) |
210 |
print gc.garbage |
211 |
Thuban.Lib.connector._the_connector.print_connections() |
212 |
|
213 |
# |
214 |
|
215 |
def create_temp_dir(): |
216 |
"""Create a temporary directory and return its name. |
217 |
|
218 |
The temporary directory is always called temp and is created in the |
219 |
directory where support module is located. |
220 |
|
221 |
If the temp directory already exists, just return the name. |
222 |
""" |
223 |
name = os.path.abspath(os.path.join(os.path.dirname(__file__), "temp")) |
224 |
|
225 |
# if the directory already exists, we're done |
226 |
if os.path.isdir(name): |
227 |
return name |
228 |
|
229 |
# create the directory |
230 |
os.mkdir(name) |
231 |
return name |
232 |
|
233 |
|
234 |
class FileTestMixin: |
235 |
|
236 |
"""Mixin class for tests that use files in the temporary directory |
237 |
""" |
238 |
|
239 |
def temp_file_name(self, basename): |
240 |
"""Return the full name of the file named basename in the temp. dir""" |
241 |
return os.path.join(create_temp_dir(), basename) |
242 |
|
243 |
def temp_dir(self): |
244 |
"""Return the name of the directory for the temporary files""" |
245 |
return create_temp_dir() |
246 |
|
247 |
|
248 |
|
249 |
class FileLoadTestCase(unittest.TestCase, FileTestMixin): |
250 |
|
251 |
"""Base class for test case that test loading files. |
252 |
|
253 |
This base class provides some common infrastructure for testing the |
254 |
reading of files. |
255 |
|
256 |
Each test case should be its own class derived from this one. There |
257 |
is one file associated with each class. The contents are defined by |
258 |
the file_contents class attribute and its name by the filename |
259 |
method. |
260 |
|
261 |
Derived classes usually only have to provide appropriate values for |
262 |
the file_contents and file_extension class attributes. |
263 |
""" |
264 |
|
265 |
file_contents = None |
266 |
file_extension = "" |
267 |
|
268 |
def filename(self): |
269 |
"""Return the name of the test file to use. |
270 |
|
271 |
The default implementation simply calls self.volatile_file_name |
272 |
with a basename derived from the class name by stripping off a |
273 |
leading 'test_' and appending self.file_extension. |
274 |
""" |
275 |
name = self.__class__.__name__ |
276 |
if name.startswith("test_"): |
277 |
name = name[5:] |
278 |
return self.temp_file_name(name + self.file_extension) |
279 |
|
280 |
def setUp(self): |
281 |
"""Create the volatile file for the test. |
282 |
|
283 |
Write self.contents (which should be a string) to the file named |
284 |
by self.filename(). |
285 |
""" |
286 |
filename = self.filename() |
287 |
file = open(filename, "w") |
288 |
file.write(self.file_contents) |
289 |
file.close() |
290 |
|
291 |
|
292 |
class FloatComparisonMixin: |
293 |
|
294 |
""" |
295 |
Mixin class for tests comparing floating point numbers. |
296 |
|
297 |
This class provides a few methods for testing floating point |
298 |
operations. |
299 |
""" |
300 |
|
301 |
fp_epsilon = 1e-6 |
302 |
fp_inf = float('1e1000') # FIXME: hack for infinite |
303 |
|
304 |
def assertFloatEqual(self, test, value, epsilon = None): |
305 |
"""Assert equality of test and value with some tolerance. |
306 |
|
307 |
Assert that the absolute difference between test and value is |
308 |
less than self.fp_epsilon. |
309 |
""" |
310 |
if epsilon is None: |
311 |
epsilon = self.fp_epsilon |
312 |
if abs(test) == self.fp_inf: |
313 |
self.assertEqual(test, value) |
314 |
else: |
315 |
self.assert_(epsilon > abs(test - value), |
316 |
"abs(%g - %g) >= %g" % (test, value, epsilon)) |
317 |
|
318 |
def assertFloatSeqEqual(self, test, value, epsilon = None): |
319 |
"""Assert equality of the sequences test and value with some tolerance. |
320 |
|
321 |
Assert that the absolute difference between each corresponding |
322 |
value in test and value is less than the optional parameter |
323 |
epsilon. If epsilon is not given use self.fp_epsilon. |
324 |
""" |
325 |
self.assertEquals(len(test), len(value)) |
326 |
for i in range(len(test)): |
327 |
self.assertFloatEqual(test[i], value[i], epsilon) |
328 |
|
329 |
def assertPointListEquals(self, test, value): |
330 |
"""Assert equality of two lists of lists of tuples of float |
331 |
|
332 |
This assertion is usually used to compare the geometry of shapes |
333 |
as returned by a Shape object's Points() method, hence the name. |
334 |
""" |
335 |
for i in range(len(test)): |
336 |
self.assertEquals(len(test[i]), len(value[i])) |
337 |
for j in range(len(test[i])): |
338 |
self.assertFloatSeqEqual(test[i][j], value[i][j]) |
339 |
|
340 |
|
341 |
class SubscriberMixin: |
342 |
|
343 |
"""Mixin class for tests for messages sent through the Connector |
344 |
|
345 |
The SubscriberMixin has some methods that can be used as subscribers |
346 |
of events that when called append information about the message into |
347 |
a list of messages received. |
348 |
|
349 |
A derived class should call the clear_messages() method in both its |
350 |
setUp and tearDown methods to clear the list of messages received. |
351 |
""" |
352 |
|
353 |
def clear_messages(self): |
354 |
"""Clear the list of received messages. |
355 |
|
356 |
Call this at least in the tests setUp and tearDown methods. It's |
357 |
important to do it in tearDown too because otherwise there may |
358 |
be cyclic references. |
359 |
""" |
360 |
self.received_messages = [] |
361 |
|
362 |
def subscribe_no_params(self): |
363 |
"""Method for subscriptions without parameters. |
364 |
|
365 |
Add an empty tuple to the list of received messages. |
366 |
""" |
367 |
self.received_messages.append(()) |
368 |
|
369 |
def subscribe_with_params(self, *args): |
370 |
"""Method for subscriptions with parameters. |
371 |
|
372 |
Append the tuple will all arguments to this function (except for |
373 |
the self argument) to the list of received messages. |
374 |
""" |
375 |
self.received_messages.append(args) |
376 |
|
377 |
def check_messages(self, messages): |
378 |
"""Check whether the messages received match the list messages""" |
379 |
self.assertEquals(messages, self.received_messages) |
380 |
|