/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1364 - (hide annotations)
Thu Jul 3 13:09:43 2003 UTC (21 years, 8 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 16232 byte(s)
* test/test_transientdb.py
(TestTransientTable.test_transient_joined_table_same_column_name):
Update to reflect the new behavior
(TestTransientTable.test_transient_joined_table_with_equal_column_names):
Update to reflect the new behavior
(TestTransientTable.test_transient_joined_table_name_collisions_dont_modify_in_place):
New test case for a bug which modified the column objects in place

* Thuban/Model/transientdb.py (TransientJoinedTable.__init__):
Update doc-string
(TransientJoinedTable.create): Do not modify the column objects of
the input tables in place and copy all columns of the input tables
into the joined table after all.

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 bh 839 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25     TransientJoinedTable, AutoTransientTable
26    
27    
28     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29    
30     def setUp(self):
31     """Create a transient database as self.transientdb"""
32     filename = self.temp_file_name("transient_table.sqlite")
33     if os.path.exists(filename):
34     os.remove(filename)
35     journal = filename + "-journal"
36     if os.path.exists(journal):
37     print "removing journal", journal
38     os.remove(journal)
39     self.transientdb = TransientDatabase(filename)
40    
41     def tearDown(self):
42     self.transientdb.close()
43    
44     def run_iceland_political_tests(self, table):
45     """Run some tests on tablte
46    
47     Assume that table holds the data of the file
48     ../Data/iceland/political.dbf sample file.
49     """
50 bh 818 self.assertEquals(table.NumRows(), 156)
51     self.assertEquals(table.NumColumns(), 8)
52 bh 765
53 bh 1041 # Check one each of the possible field types.
54 bh 818 columns = table.Columns()
55     self.assertEquals(columns[0].name, 'AREA')
56 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
58 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
60 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61 bh 765
62 bh 839 # HasColumn
63     self.failUnless(table.HasColumn("AREA"))
64     self.failUnless(table.HasColumn(1))
65     # HasColumn for non-exisiting columns
66     self.failIf(table.HasColumn("non_existing_name"))
67     self.failIf(table.HasColumn(100))
68    
69 bh 849 # Reading rows and values.
70 bh 818 self.assertEquals(table.ReadRowAsDict(144),
71 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72     'AREA': 19.462,
73     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74     'POPYREG': '1',
75     'PONET_ID': 145})
76 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77     self.assertEquals(table.ReadValue(144, 3), 145)
78 bh 765
79 bh 839 # ValueRange may induce a copy to the transient database.
80 bh 765 # Therefore we put it last so that we can execute this method
81     # twice to check whether the other methods still work after the
82     # copy
83 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84 bh 765
85 bh 818 unique = table.UniqueValues("PONET_ID")
86 bh 765 unique.sort()
87     self.assertEquals(unique, range(1, 157))
88    
89     def test_transient_table(self):
90     """Test TransientTable(dbftable)
91    
92     The TransientTable should copy the data to the
93     TransientDatabase.
94     """
95     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96     "political.dbf"))
97     table = TransientTable(self.transientdb, orig_table)
98     self.run_iceland_political_tests(table)
99    
100     # The transient_table method should return the table itself
101     self.assert_(table is table.transient_table())
102    
103 bh 998 # The title is simply copied over from the original table
104     self.assertEquals(table.Title(), orig_table.Title())
105    
106 bh 984 # The TransientTable class itself doesn't implement the
107     # Dependencies method, so we don't test it.
108 bh 765
109 bh 984
110 bh 765 def test_auto_transient_table(self):
111     """Test AutoTransientTable(dbftable)
112    
113     The AutoTransientTable should copy the data to the
114     TransientDatabase on demand.
115     """
116     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117     "political.dbf"))
118     table = AutoTransientTable(self.transientdb, orig_table)
119    
120     # Run the tests twice so that we execute them once when the data
121     # has not been copied to the transient db yet and once when it
122     # has. This assumes that run_iceland_political_tests does at
123     # least one call to a method that copies to the transient db at
124     # its end.
125     self.run_iceland_political_tests(table)
126     self.run_iceland_political_tests(table)
127    
128 bh 845 def test_auto_transient_table_query(self):
129     """Test AutoTransientTable.SimpleQuery()"""
130     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131     "political.dbf"))
132     table = AutoTransientTable(self.transientdb, orig_table)
133 jonathan 948 # Only a simple test here. The AutoTransientTable simply
134 bh 845 # delegates to its transient table so it should be OK that the
135     # real test for it is in test_transient_table_query. However,
136     # it's important to check that the column handling works
137     # correctly because the AutoTransientTable and it's underlying
138     # transient table use different column object types.
139     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140     [144])
141 bh 765
142 jonathan 948 # test using a Column object as the right parameter
143 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 jonathan 948 "==",
145     table.Column("POPYREG")),
146     range(156))
147    
148 bh 984 def test_auto_transient_table_dependencies(self):
149     """Test AutoTransientTable.Dependencies()"""
150     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151     "political.dbf"))
152     table = AutoTransientTable(self.transientdb, orig_table)
153     self.assertEquals(table.Dependencies(), (orig_table,))
154    
155 bh 998 def test_auto_transient_table_title(self):
156     """Test AutoTransientTable.Title()"""
157     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158     "political.dbf"))
159     table = AutoTransientTable(self.transientdb, orig_table)
160     # The title is of course the same as that of the original table
161     self.assertEquals(table.Title(), orig_table.Title())
162    
163 bh 765 def test_transient_joined_table(self):
164     """Test TransientJoinedTable"""
165 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 bh 765 ("code", FIELDTYPE_INT)],
167     [("OTHER/UNKNOWN", 0),
168     ("RUINS", 1),
169     ("FARM", 2),
170     ("BUILDING", 3),
171     ("HUT", 4),
172     ("LIGHTHOUSE", 5)])
173     auto = AutoTransientTable(self.transientdb, simple)
174     filename = os.path.join("..", "Data", "iceland",
175     "cultural_landmark-point.dbf")
176     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177    
178     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179     auto, "type")
180    
181 bh 818 self.assertEquals(table.NumRows(), 34)
182     self.assertEquals(table.NumColumns(), 8)
183 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184     self.assertEquals(table.Column(0).name, 'AREA')
185     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186     self.assertEquals(table.Column(7).name, 'code')
187     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189     # HasColumn
190     self.failUnless(table.HasColumn("AREA"))
191     self.failUnless(table.HasColumn(1))
192     # HasColumn for non-exisiting columns
193     self.failIf(table.HasColumn("non_existing_name"))
194     self.failIf(table.HasColumn(100))
195 bh 765
196 bh 849 # Reading rows and values
197 bh 839 self.assertEquals(table.ReadRowAsDict(22),
198 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201     'code': 1, 'type': 'RUINS'})
202 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203     self.assertEquals(table.ReadValue(22, 7), 1)
204 bh 765
205     # The transient_table method should return the table itself
206     self.assert_(table is table.transient_table())
207    
208 bh 984 # The TransientJoinedTable depends on both input tables
209     self.assertEquals(table.Dependencies(), (landmarks, auto))
210 bh 765
211 bh 998 # The title is constructed from the titles of the input tables.
212     self.assertEquals(table.Title(),
213     "Join of %s and %s" % (landmarks.Title(),
214     auto.Title()))
215 bh 984
216 bh 998
217 bh 1328 def test_transient_joined_table_same_column_name(self):
218     """Test TransientJoinedTable join on columns with same name
219    
220     The transient DB maps the column names used by the tables to
221     another set of names used only inside the SQLite database. There
222     was a bug in the way this mapping was used when joining on
223     fields with the same names in both tables so that the joined
224     table ended up joining on the same column in the same table.
225     """
226     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227     [(i,) for i in range(4)])
228     stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231     ("stretch_id", FIELDTYPE_INT)],
232     [(1, 0), (2, 3)])
233     discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236     discharges, "stretch_id",
237     outer_join = True)
238    
239     self.assertEquals(table.NumRows(), 4)
240 bh 1364 self.assertEquals(table.NumColumns(), 3)
241 bh 1328
242     # HasColumn
243     self.failUnless(table.HasColumn("stretch_id"))
244     self.failUnless(table.HasColumn("disch_id"))
245    
246    
247 frank 1332 def test_transient_joined_table_with_equal_column_names(self):
248     """Test TransientJoinedTable join on tables with equal column names
249    
250 bh 1364 If a name collision occurs for the field names, underscores are
251     appended as long as any collision is resolved.
252 frank 1332 """
253     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
254     ("name", FIELDTYPE_INT)],
255     [(0, 10), (1, 11), (2, 12), (3, 13) ])
256     stretches = AutoTransientTable(self.transientdb, mem_stretches)
257    
258     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
259     ("stretch_id", FIELDTYPE_INT),
260     ("name", FIELDTYPE_INT)],
261     [(1, 0, 1), (2, 3, 2)])
262     discharges = AutoTransientTable(self.transientdb, mem_discharges)
263    
264     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
265     discharges, "stretch_id",
266     outer_join = True)
267    
268     self.assertEquals(table.NumRows(), 4)
269 bh 1364 self.assertEquals(table.NumColumns(), 5)
270 frank 1332
271     # HasColumn
272 bh 1364 self.assertEquals([c.name for c in table.Columns()],
273     ["stretch_id", "name", "disch_id", "stretch_id_",
274     "name_"])
275 frank 1332
276 bh 1364 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
277     """Test TransientJoinedTable name-collisions do not modifying in place
278 frank 1332
279 bh 1364 The name collision work-around by appending underscores
280     accidentally modified the column objects in place. We do two
281     joins therefore in reverse order to detect this: The first join
282     will lead to a modified name in the column object of the right
283     table which is then used as the left table in the second join so
284     the underscored name will appear before the non-underscored one
285     in the list of column names after the second join.
286     """
287     mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
288     ("name", FIELDTYPE_INT)],
289     [(0, 10), (1, 11), (2, 12), (3, 13) ])
290     table1 = AutoTransientTable(self.transientdb, mem1)
291    
292     mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
293     ("name", FIELDTYPE_INT)],
294     [(0, 10), (1, 11), (2, 12), (3, 13) ])
295     table2 = AutoTransientTable(self.transientdb, mem2)
296    
297     table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
298     table2, "stretch_id", outer_join = True)
299     table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
300     table1, "stretch_id", outer_join = True)
301    
302     self.assertEquals([c.name for c in table.Columns()],
303     ["stretch_id", "name", "stretch_id_", "name_"])
304    
305 bh 785 def test_transient_table_read_twice(self):
306 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
307 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
308 bh 785 ("code", FIELDTYPE_INT)],
309     [("OTHER/UNKNOWN", 0),
310     ("RUINS", 1),
311     ("FARM", 2),
312     ("BUILDING", 3),
313     ("HUT", 4),
314     ("LIGHTHOUSE", 5)])
315     table = TransientTable(self.transientdb, simple)
316 bh 765
317 bh 785 # There was a bug where reading the same record twice would
318     # raise an exception in the second call because of an
319     # unitialized local variable, so for passing the test it's
320     # enough if reading simply succeeds. OTOH, while we're at it we
321     # might as well check whether the results are equal anyway :)
322 bh 839 result1 = table.ReadRowAsDict(3)
323     result2 = table.ReadRowAsDict(3)
324 bh 785 self.assertEquals(result1, result2)
325    
326 bh 818
327 bh 842 def test_transient_table_query(self):
328     """Test TransientTable.SimpleQuery()"""
329     simple = MemoryTable([("type", FIELDTYPE_STRING),
330     ("value", FIELDTYPE_DOUBLE),
331     ("code", FIELDTYPE_INT)],
332     [("OTHER/UNKNOWN", -1.5, 11),
333     ("RUINS", 0.0, 1),
334     ("FARM", 3.141, 2),
335     ("BUILDING", 2.5, 3),
336     ("HUT", 1e6, 4),
337     ("LIGHTHOUSE", -0.01, 5)])
338     table = TransientTable(self.transientdb, simple)
339    
340     # A column and a value
341     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
342     [1])
343     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
344     [0, 1, 3, 4, 5])
345     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
346     [0, 1, 5])
347     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
348     [0])
349     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
350     [0, 4, 5])
351     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
352     [0, 3, 4, 5])
353    
354     # Two columns as operands
355     self.assertEquals(table.SimpleQuery(table.Column(1),
356     "<=", table.Column(2)),
357     [0, 1, 3, 5])
358    
359     # Test whether invalid operators raise a ValueError
360     self.assertRaises(ValueError,
361     table.SimpleQuery,
362     table.Column(1), "<<", table.Column(2))
363    
364    
365 bh 765 if __name__ == "__main__":
366     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26