/[thuban]/branches/greater-ms3/thuban/test/test_transientdb.py
ViewVC logotype

Contents of /branches/greater-ms3/thuban/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1366 - (show annotations)
Thu Jul 3 13:24:28 2003 UTC (21 years, 8 months ago) by bh
File MIME type: text/x-python
File size: 16232 byte(s)
* Thuban/Model/transientdb.py (TransientJoinedTable.__init__):
Update doc-string
(TransientJoinedTable.create): Do not modify the column objects of
the input tables in place and copy all columns of the input tables
into the joined table after all.

* test/test_transientdb.py
(TestTransientTable.test_transient_joined_table_same_column_name):
Update to reflect the new behavior
(TestTransientTable.test_transient_joined_table_with_equal_column_names):
Update to reflect the new behavior
(TestTransientTable.test_transient_joined_table_name_collisions_dont_modify_in_place):
New test case for a bug which modified the column objects in place

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25 TransientJoinedTable, AutoTransientTable
26
27
28 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29
30 def setUp(self):
31 """Create a transient database as self.transientdb"""
32 filename = self.temp_file_name("transient_table.sqlite")
33 if os.path.exists(filename):
34 os.remove(filename)
35 journal = filename + "-journal"
36 if os.path.exists(journal):
37 print "removing journal", journal
38 os.remove(journal)
39 self.transientdb = TransientDatabase(filename)
40
41 def tearDown(self):
42 self.transientdb.close()
43
44 def run_iceland_political_tests(self, table):
45 """Run some tests on tablte
46
47 Assume that table holds the data of the file
48 ../Data/iceland/political.dbf sample file.
49 """
50 self.assertEquals(table.NumRows(), 156)
51 self.assertEquals(table.NumColumns(), 8)
52
53 # Check one each of the possible field types.
54 columns = table.Columns()
55 self.assertEquals(columns[0].name, 'AREA')
56 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 self.assertEquals(columns[3].name, 'PONET_ID')
58 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 self.assertEquals(columns[6].name, 'POPYCOUN')
60 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61
62 # HasColumn
63 self.failUnless(table.HasColumn("AREA"))
64 self.failUnless(table.HasColumn(1))
65 # HasColumn for non-exisiting columns
66 self.failIf(table.HasColumn("non_existing_name"))
67 self.failIf(table.HasColumn(100))
68
69 # Reading rows and values.
70 self.assertEquals(table.ReadRowAsDict(144),
71 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72 'AREA': 19.462,
73 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74 'POPYREG': '1',
75 'PONET_ID': 145})
76 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77 self.assertEquals(table.ReadValue(144, 3), 145)
78
79 # ValueRange may induce a copy to the transient database.
80 # Therefore we put it last so that we can execute this method
81 # twice to check whether the other methods still work after the
82 # copy
83 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84
85 unique = table.UniqueValues("PONET_ID")
86 unique.sort()
87 self.assertEquals(unique, range(1, 157))
88
89 def test_transient_table(self):
90 """Test TransientTable(dbftable)
91
92 The TransientTable should copy the data to the
93 TransientDatabase.
94 """
95 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96 "political.dbf"))
97 table = TransientTable(self.transientdb, orig_table)
98 self.run_iceland_political_tests(table)
99
100 # The transient_table method should return the table itself
101 self.assert_(table is table.transient_table())
102
103 # The title is simply copied over from the original table
104 self.assertEquals(table.Title(), orig_table.Title())
105
106 # The TransientTable class itself doesn't implement the
107 # Dependencies method, so we don't test it.
108
109
110 def test_auto_transient_table(self):
111 """Test AutoTransientTable(dbftable)
112
113 The AutoTransientTable should copy the data to the
114 TransientDatabase on demand.
115 """
116 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117 "political.dbf"))
118 table = AutoTransientTable(self.transientdb, orig_table)
119
120 # Run the tests twice so that we execute them once when the data
121 # has not been copied to the transient db yet and once when it
122 # has. This assumes that run_iceland_political_tests does at
123 # least one call to a method that copies to the transient db at
124 # its end.
125 self.run_iceland_political_tests(table)
126 self.run_iceland_political_tests(table)
127
128 def test_auto_transient_table_query(self):
129 """Test AutoTransientTable.SimpleQuery()"""
130 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131 "political.dbf"))
132 table = AutoTransientTable(self.transientdb, orig_table)
133 # Only a simple test here. The AutoTransientTable simply
134 # delegates to its transient table so it should be OK that the
135 # real test for it is in test_transient_table_query. However,
136 # it's important to check that the column handling works
137 # correctly because the AutoTransientTable and it's underlying
138 # transient table use different column object types.
139 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140 [144])
141
142 # test using a Column object as the right parameter
143 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 "==",
145 table.Column("POPYREG")),
146 range(156))
147
148 def test_auto_transient_table_dependencies(self):
149 """Test AutoTransientTable.Dependencies()"""
150 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151 "political.dbf"))
152 table = AutoTransientTable(self.transientdb, orig_table)
153 self.assertEquals(table.Dependencies(), (orig_table,))
154
155 def test_auto_transient_table_title(self):
156 """Test AutoTransientTable.Title()"""
157 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158 "political.dbf"))
159 table = AutoTransientTable(self.transientdb, orig_table)
160 # The title is of course the same as that of the original table
161 self.assertEquals(table.Title(), orig_table.Title())
162
163 def test_transient_joined_table(self):
164 """Test TransientJoinedTable"""
165 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 ("code", FIELDTYPE_INT)],
167 [("OTHER/UNKNOWN", 0),
168 ("RUINS", 1),
169 ("FARM", 2),
170 ("BUILDING", 3),
171 ("HUT", 4),
172 ("LIGHTHOUSE", 5)])
173 auto = AutoTransientTable(self.transientdb, simple)
174 filename = os.path.join("..", "Data", "iceland",
175 "cultural_landmark-point.dbf")
176 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177
178 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179 auto, "type")
180
181 self.assertEquals(table.NumRows(), 34)
182 self.assertEquals(table.NumColumns(), 8)
183 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184 self.assertEquals(table.Column(0).name, 'AREA')
185 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186 self.assertEquals(table.Column(7).name, 'code')
187 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189 # HasColumn
190 self.failUnless(table.HasColumn("AREA"))
191 self.failUnless(table.HasColumn(1))
192 # HasColumn for non-exisiting columns
193 self.failIf(table.HasColumn("non_existing_name"))
194 self.failIf(table.HasColumn(100))
195
196 # Reading rows and values
197 self.assertEquals(table.ReadRowAsDict(22),
198 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201 'code': 1, 'type': 'RUINS'})
202 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203 self.assertEquals(table.ReadValue(22, 7), 1)
204
205 # The transient_table method should return the table itself
206 self.assert_(table is table.transient_table())
207
208 # The TransientJoinedTable depends on both input tables
209 self.assertEquals(table.Dependencies(), (landmarks, auto))
210
211 # The title is constructed from the titles of the input tables.
212 self.assertEquals(table.Title(),
213 "Join of %s and %s" % (landmarks.Title(),
214 auto.Title()))
215
216
217 def test_transient_joined_table_same_column_name(self):
218 """Test TransientJoinedTable join on columns with same name
219
220 The transient DB maps the column names used by the tables to
221 another set of names used only inside the SQLite database. There
222 was a bug in the way this mapping was used when joining on
223 fields with the same names in both tables so that the joined
224 table ended up joining on the same column in the same table.
225 """
226 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227 [(i,) for i in range(4)])
228 stretches = AutoTransientTable(self.transientdb, mem_stretches)
229
230 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231 ("stretch_id", FIELDTYPE_INT)],
232 [(1, 0), (2, 3)])
233 discharges = AutoTransientTable(self.transientdb, mem_discharges)
234
235 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236 discharges, "stretch_id",
237 outer_join = True)
238
239 self.assertEquals(table.NumRows(), 4)
240 self.assertEquals(table.NumColumns(), 3)
241
242 # HasColumn
243 self.failUnless(table.HasColumn("stretch_id"))
244 self.failUnless(table.HasColumn("disch_id"))
245
246
247 def test_transient_joined_table_with_equal_column_names(self):
248 """Test TransientJoinedTable join on tables with equal column names
249
250 If a name collision occurs for the field names, underscores are
251 appended as long as any collision is resolved.
252 """
253 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
254 ("name", FIELDTYPE_INT)],
255 [(0, 10), (1, 11), (2, 12), (3, 13) ])
256 stretches = AutoTransientTable(self.transientdb, mem_stretches)
257
258 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
259 ("stretch_id", FIELDTYPE_INT),
260 ("name", FIELDTYPE_INT)],
261 [(1, 0, 1), (2, 3, 2)])
262 discharges = AutoTransientTable(self.transientdb, mem_discharges)
263
264 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
265 discharges, "stretch_id",
266 outer_join = True)
267
268 self.assertEquals(table.NumRows(), 4)
269 self.assertEquals(table.NumColumns(), 5)
270
271 # HasColumn
272 self.assertEquals([c.name for c in table.Columns()],
273 ["stretch_id", "name", "disch_id", "stretch_id_",
274 "name_"])
275
276 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
277 """Test TransientJoinedTable name-collisions do not modifying in place
278
279 The name collision work-around by appending underscores
280 accidentally modified the column objects in place. We do two
281 joins therefore in reverse order to detect this: The first join
282 will lead to a modified name in the column object of the right
283 table which is then used as the left table in the second join so
284 the underscored name will appear before the non-underscored one
285 in the list of column names after the second join.
286 """
287 mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
288 ("name", FIELDTYPE_INT)],
289 [(0, 10), (1, 11), (2, 12), (3, 13) ])
290 table1 = AutoTransientTable(self.transientdb, mem1)
291
292 mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
293 ("name", FIELDTYPE_INT)],
294 [(0, 10), (1, 11), (2, 12), (3, 13) ])
295 table2 = AutoTransientTable(self.transientdb, mem2)
296
297 table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
298 table2, "stretch_id", outer_join = True)
299 table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
300 table1, "stretch_id", outer_join = True)
301
302 self.assertEquals([c.name for c in table.Columns()],
303 ["stretch_id", "name", "stretch_id_", "name_"])
304
305 def test_transient_table_read_twice(self):
306 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
307 simple = MemoryTable([("type", FIELDTYPE_STRING),
308 ("code", FIELDTYPE_INT)],
309 [("OTHER/UNKNOWN", 0),
310 ("RUINS", 1),
311 ("FARM", 2),
312 ("BUILDING", 3),
313 ("HUT", 4),
314 ("LIGHTHOUSE", 5)])
315 table = TransientTable(self.transientdb, simple)
316
317 # There was a bug where reading the same record twice would
318 # raise an exception in the second call because of an
319 # unitialized local variable, so for passing the test it's
320 # enough if reading simply succeeds. OTOH, while we're at it we
321 # might as well check whether the results are equal anyway :)
322 result1 = table.ReadRowAsDict(3)
323 result2 = table.ReadRowAsDict(3)
324 self.assertEquals(result1, result2)
325
326
327 def test_transient_table_query(self):
328 """Test TransientTable.SimpleQuery()"""
329 simple = MemoryTable([("type", FIELDTYPE_STRING),
330 ("value", FIELDTYPE_DOUBLE),
331 ("code", FIELDTYPE_INT)],
332 [("OTHER/UNKNOWN", -1.5, 11),
333 ("RUINS", 0.0, 1),
334 ("FARM", 3.141, 2),
335 ("BUILDING", 2.5, 3),
336 ("HUT", 1e6, 4),
337 ("LIGHTHOUSE", -0.01, 5)])
338 table = TransientTable(self.transientdb, simple)
339
340 # A column and a value
341 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
342 [1])
343 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
344 [0, 1, 3, 4, 5])
345 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
346 [0, 1, 5])
347 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
348 [0])
349 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
350 [0, 4, 5])
351 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
352 [0, 3, 4, 5])
353
354 # Two columns as operands
355 self.assertEquals(table.SimpleQuery(table.Column(1),
356 "<=", table.Column(2)),
357 [0, 1, 3, 5])
358
359 # Test whether invalid operators raise a ValueError
360 self.assertRaises(ValueError,
361 table.SimpleQuery,
362 table.Column(1), "<<", table.Column(2))
363
364
365 if __name__ == "__main__":
366 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26