/[thuban]/branches/greater-ms3/thuban/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/greater-ms3/thuban/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1330 - (hide annotations)
Tue Jul 1 15:28:28 2003 UTC (21 years, 8 months ago) by frank
File MIME type: text/x-python
File size: 14852 byte(s)
(TestTransientTable.test_transient_joined_table_same_column_name):
	New. Test whether joining on columns with the same names in both
	tables works.
(TestTransientTable.test_transient_joined_table_with_equal_column_names):
	New. Test join of two tables with partly equal column names.

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 bh 839 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25     TransientJoinedTable, AutoTransientTable
26    
27    
28     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29    
30     def setUp(self):
31     """Create a transient database as self.transientdb"""
32     filename = self.temp_file_name("transient_table.sqlite")
33     if os.path.exists(filename):
34     os.remove(filename)
35     journal = filename + "-journal"
36     if os.path.exists(journal):
37     print "removing journal", journal
38     os.remove(journal)
39     self.transientdb = TransientDatabase(filename)
40    
41     def tearDown(self):
42     self.transientdb.close()
43    
44     def run_iceland_political_tests(self, table):
45     """Run some tests on tablte
46    
47     Assume that table holds the data of the file
48     ../Data/iceland/political.dbf sample file.
49     """
50 bh 818 self.assertEquals(table.NumRows(), 156)
51     self.assertEquals(table.NumColumns(), 8)
52 bh 765
53 bh 1041 # Check one each of the possible field types.
54 bh 818 columns = table.Columns()
55     self.assertEquals(columns[0].name, 'AREA')
56 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
58 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
60 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61 bh 765
62 bh 839 # HasColumn
63     self.failUnless(table.HasColumn("AREA"))
64     self.failUnless(table.HasColumn(1))
65     # HasColumn for non-exisiting columns
66     self.failIf(table.HasColumn("non_existing_name"))
67     self.failIf(table.HasColumn(100))
68    
69 bh 849 # Reading rows and values.
70 bh 818 self.assertEquals(table.ReadRowAsDict(144),
71 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72     'AREA': 19.462,
73     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74     'POPYREG': '1',
75     'PONET_ID': 145})
76 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77     self.assertEquals(table.ReadValue(144, 3), 145)
78 bh 765
79 bh 839 # ValueRange may induce a copy to the transient database.
80 bh 765 # Therefore we put it last so that we can execute this method
81     # twice to check whether the other methods still work after the
82     # copy
83 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84 bh 765
85 bh 818 unique = table.UniqueValues("PONET_ID")
86 bh 765 unique.sort()
87     self.assertEquals(unique, range(1, 157))
88    
89     def test_transient_table(self):
90     """Test TransientTable(dbftable)
91    
92     The TransientTable should copy the data to the
93     TransientDatabase.
94     """
95     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96     "political.dbf"))
97     table = TransientTable(self.transientdb, orig_table)
98     self.run_iceland_political_tests(table)
99    
100     # The transient_table method should return the table itself
101     self.assert_(table is table.transient_table())
102    
103 bh 998 # The title is simply copied over from the original table
104     self.assertEquals(table.Title(), orig_table.Title())
105    
106 bh 984 # The TransientTable class itself doesn't implement the
107     # Dependencies method, so we don't test it.
108 bh 765
109 bh 984
110 bh 765 def test_auto_transient_table(self):
111     """Test AutoTransientTable(dbftable)
112    
113     The AutoTransientTable should copy the data to the
114     TransientDatabase on demand.
115     """
116     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117     "political.dbf"))
118     table = AutoTransientTable(self.transientdb, orig_table)
119    
120     # Run the tests twice so that we execute them once when the data
121     # has not been copied to the transient db yet and once when it
122     # has. This assumes that run_iceland_political_tests does at
123     # least one call to a method that copies to the transient db at
124     # its end.
125     self.run_iceland_political_tests(table)
126     self.run_iceland_political_tests(table)
127    
128 bh 845 def test_auto_transient_table_query(self):
129     """Test AutoTransientTable.SimpleQuery()"""
130     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131     "political.dbf"))
132     table = AutoTransientTable(self.transientdb, orig_table)
133 jonathan 948 # Only a simple test here. The AutoTransientTable simply
134 bh 845 # delegates to its transient table so it should be OK that the
135     # real test for it is in test_transient_table_query. However,
136     # it's important to check that the column handling works
137     # correctly because the AutoTransientTable and it's underlying
138     # transient table use different column object types.
139     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140     [144])
141 bh 765
142 jonathan 948 # test using a Column object as the right parameter
143 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 jonathan 948 "==",
145     table.Column("POPYREG")),
146     range(156))
147    
148 bh 984 def test_auto_transient_table_dependencies(self):
149     """Test AutoTransientTable.Dependencies()"""
150     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151     "political.dbf"))
152     table = AutoTransientTable(self.transientdb, orig_table)
153     self.assertEquals(table.Dependencies(), (orig_table,))
154    
155 bh 998 def test_auto_transient_table_title(self):
156     """Test AutoTransientTable.Title()"""
157     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158     "political.dbf"))
159     table = AutoTransientTable(self.transientdb, orig_table)
160     # The title is of course the same as that of the original table
161     self.assertEquals(table.Title(), orig_table.Title())
162    
163 bh 765 def test_transient_joined_table(self):
164     """Test TransientJoinedTable"""
165 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 bh 765 ("code", FIELDTYPE_INT)],
167     [("OTHER/UNKNOWN", 0),
168     ("RUINS", 1),
169     ("FARM", 2),
170     ("BUILDING", 3),
171     ("HUT", 4),
172     ("LIGHTHOUSE", 5)])
173     auto = AutoTransientTable(self.transientdb, simple)
174     filename = os.path.join("..", "Data", "iceland",
175     "cultural_landmark-point.dbf")
176     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177    
178     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179     auto, "type")
180    
181 bh 818 self.assertEquals(table.NumRows(), 34)
182     self.assertEquals(table.NumColumns(), 8)
183 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184     self.assertEquals(table.Column(0).name, 'AREA')
185     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186     self.assertEquals(table.Column(7).name, 'code')
187     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189     # HasColumn
190     self.failUnless(table.HasColumn("AREA"))
191     self.failUnless(table.HasColumn(1))
192     # HasColumn for non-exisiting columns
193     self.failIf(table.HasColumn("non_existing_name"))
194     self.failIf(table.HasColumn(100))
195 bh 765
196 bh 849 # Reading rows and values
197 bh 839 self.assertEquals(table.ReadRowAsDict(22),
198 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201     'code': 1, 'type': 'RUINS'})
202 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203     self.assertEquals(table.ReadValue(22, 7), 1)
204 bh 765
205     # The transient_table method should return the table itself
206     self.assert_(table is table.transient_table())
207    
208 bh 984 # The TransientJoinedTable depends on both input tables
209     self.assertEquals(table.Dependencies(), (landmarks, auto))
210 bh 765
211 bh 998 # The title is constructed from the titles of the input tables.
212     self.assertEquals(table.Title(),
213     "Join of %s and %s" % (landmarks.Title(),
214     auto.Title()))
215 bh 984
216 bh 998
217 frank 1330 def test_transient_joined_table_same_column_name(self):
218     """Test TransientJoinedTable join on columns with same name
219    
220     The transient DB maps the column names used by the tables to
221     another set of names used only inside the SQLite database. There
222     was a bug in the way this mapping was used when joining on
223     fields with the same names in both tables so that the joined
224     table ended up joining on the same column in the same table.
225     """
226     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227     [(i,) for i in range(4)])
228     stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231     ("stretch_id", FIELDTYPE_INT)],
232     [(1, 0), (2, 3)])
233     discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236     discharges, "stretch_id",
237     outer_join = True)
238    
239     self.assertEquals(table.NumRows(), 4)
240     self.assertEquals(table.NumColumns(), 2)
241    
242     # HasColumn
243     self.failUnless(table.HasColumn("stretch_id"))
244     self.failUnless(table.HasColumn("disch_id"))
245    
246    
247     def test_transient_joined_table_with_equal_column_names(self):
248     """Test TransientJoinedTable join on tables with equal column names
249    
250     The join of two tables contains all fields from both tables instead
251     th field the join was performed on. This special field is included
252     once. If a name collision occurs for the field names, underscores are
253     appended as long as any collision is resolved.
254     """
255     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
256     ("name", FIELDTYPE_INT)],
257     [(0, 10), (1, 11), (2, 12), (3, 13) ])
258     stretches = AutoTransientTable(self.transientdb, mem_stretches)
259    
260     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
261     ("stretch_id", FIELDTYPE_INT),
262     ("name", FIELDTYPE_INT)],
263     [(1, 0, 1), (2, 3, 2)])
264     discharges = AutoTransientTable(self.transientdb, mem_discharges)
265    
266     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
267     discharges, "stretch_id",
268     outer_join = True)
269    
270     self.assertEquals(table.NumRows(), 4)
271     self.assertEquals(table.NumColumns(), 4)
272    
273     # HasColumn
274     self.failUnless(table.HasColumn("stretch_id"))
275     self.failUnless(table.HasColumn("disch_id"))
276     self.failUnless(table.HasColumn("name"))
277     self.failUnless(table.HasColumn("name_"))
278    
279    
280 bh 785 def test_transient_table_read_twice(self):
281 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
282 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
283 bh 785 ("code", FIELDTYPE_INT)],
284     [("OTHER/UNKNOWN", 0),
285     ("RUINS", 1),
286     ("FARM", 2),
287     ("BUILDING", 3),
288     ("HUT", 4),
289     ("LIGHTHOUSE", 5)])
290     table = TransientTable(self.transientdb, simple)
291 bh 765
292 bh 785 # There was a bug where reading the same record twice would
293     # raise an exception in the second call because of an
294     # unitialized local variable, so for passing the test it's
295     # enough if reading simply succeeds. OTOH, while we're at it we
296     # might as well check whether the results are equal anyway :)
297 bh 839 result1 = table.ReadRowAsDict(3)
298     result2 = table.ReadRowAsDict(3)
299 bh 785 self.assertEquals(result1, result2)
300    
301 bh 818
302 bh 842 def test_transient_table_query(self):
303     """Test TransientTable.SimpleQuery()"""
304     simple = MemoryTable([("type", FIELDTYPE_STRING),
305     ("value", FIELDTYPE_DOUBLE),
306     ("code", FIELDTYPE_INT)],
307     [("OTHER/UNKNOWN", -1.5, 11),
308     ("RUINS", 0.0, 1),
309     ("FARM", 3.141, 2),
310     ("BUILDING", 2.5, 3),
311     ("HUT", 1e6, 4),
312     ("LIGHTHOUSE", -0.01, 5)])
313     table = TransientTable(self.transientdb, simple)
314    
315     # A column and a value
316     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
317     [1])
318     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
319     [0, 1, 3, 4, 5])
320     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
321     [0, 1, 5])
322     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
323     [0])
324     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
325     [0, 4, 5])
326     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
327     [0, 3, 4, 5])
328    
329     # Two columns as operands
330     self.assertEquals(table.SimpleQuery(table.Column(1),
331     "<=", table.Column(2)),
332     [0, 1, 3, 5])
333    
334     # Test whether invalid operators raise a ValueError
335     self.assertRaises(ValueError,
336     table.SimpleQuery,
337     table.Column(1), "<<", table.Column(2))
338    
339    
340 bh 765 if __name__ == "__main__":
341     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26