/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1328 - (hide annotations)
Tue Jul 1 12:01:58 2003 UTC (21 years, 8 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 13264 byte(s)
* test/test_transientdb.py
(TestTransientTable.test_transient_joined_table_same_column_name):
New. Test whether joining on columns with the same names in both
tables works.

* Thuban/Model/transientdb.py (TransientJoinedTable.create): Make
sure to use the right internal names even when joining on field
with the same names in both tables. Also, detect duplicate names
in the joined table correctly.

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 bh 839 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25     TransientJoinedTable, AutoTransientTable
26    
27    
28     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29    
30     def setUp(self):
31     """Create a transient database as self.transientdb"""
32     filename = self.temp_file_name("transient_table.sqlite")
33     if os.path.exists(filename):
34     os.remove(filename)
35     journal = filename + "-journal"
36     if os.path.exists(journal):
37     print "removing journal", journal
38     os.remove(journal)
39     self.transientdb = TransientDatabase(filename)
40    
41     def tearDown(self):
42     self.transientdb.close()
43    
44     def run_iceland_political_tests(self, table):
45     """Run some tests on tablte
46    
47     Assume that table holds the data of the file
48     ../Data/iceland/political.dbf sample file.
49     """
50 bh 818 self.assertEquals(table.NumRows(), 156)
51     self.assertEquals(table.NumColumns(), 8)
52 bh 765
53 bh 1041 # Check one each of the possible field types.
54 bh 818 columns = table.Columns()
55     self.assertEquals(columns[0].name, 'AREA')
56 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
58 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
60 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61 bh 765
62 bh 839 # HasColumn
63     self.failUnless(table.HasColumn("AREA"))
64     self.failUnless(table.HasColumn(1))
65     # HasColumn for non-exisiting columns
66     self.failIf(table.HasColumn("non_existing_name"))
67     self.failIf(table.HasColumn(100))
68    
69 bh 849 # Reading rows and values.
70 bh 818 self.assertEquals(table.ReadRowAsDict(144),
71 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72     'AREA': 19.462,
73     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74     'POPYREG': '1',
75     'PONET_ID': 145})
76 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77     self.assertEquals(table.ReadValue(144, 3), 145)
78 bh 765
79 bh 839 # ValueRange may induce a copy to the transient database.
80 bh 765 # Therefore we put it last so that we can execute this method
81     # twice to check whether the other methods still work after the
82     # copy
83 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84 bh 765
85 bh 818 unique = table.UniqueValues("PONET_ID")
86 bh 765 unique.sort()
87     self.assertEquals(unique, range(1, 157))
88    
89     def test_transient_table(self):
90     """Test TransientTable(dbftable)
91    
92     The TransientTable should copy the data to the
93     TransientDatabase.
94     """
95     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96     "political.dbf"))
97     table = TransientTable(self.transientdb, orig_table)
98     self.run_iceland_political_tests(table)
99    
100     # The transient_table method should return the table itself
101     self.assert_(table is table.transient_table())
102    
103 bh 998 # The title is simply copied over from the original table
104     self.assertEquals(table.Title(), orig_table.Title())
105    
106 bh 984 # The TransientTable class itself doesn't implement the
107     # Dependencies method, so we don't test it.
108 bh 765
109 bh 984
110 bh 765 def test_auto_transient_table(self):
111     """Test AutoTransientTable(dbftable)
112    
113     The AutoTransientTable should copy the data to the
114     TransientDatabase on demand.
115     """
116     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117     "political.dbf"))
118     table = AutoTransientTable(self.transientdb, orig_table)
119    
120     # Run the tests twice so that we execute them once when the data
121     # has not been copied to the transient db yet and once when it
122     # has. This assumes that run_iceland_political_tests does at
123     # least one call to a method that copies to the transient db at
124     # its end.
125     self.run_iceland_political_tests(table)
126     self.run_iceland_political_tests(table)
127    
128 bh 845 def test_auto_transient_table_query(self):
129     """Test AutoTransientTable.SimpleQuery()"""
130     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131     "political.dbf"))
132     table = AutoTransientTable(self.transientdb, orig_table)
133 jonathan 948 # Only a simple test here. The AutoTransientTable simply
134 bh 845 # delegates to its transient table so it should be OK that the
135     # real test for it is in test_transient_table_query. However,
136     # it's important to check that the column handling works
137     # correctly because the AutoTransientTable and it's underlying
138     # transient table use different column object types.
139     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140     [144])
141 bh 765
142 jonathan 948 # test using a Column object as the right parameter
143 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 jonathan 948 "==",
145     table.Column("POPYREG")),
146     range(156))
147    
148 bh 984 def test_auto_transient_table_dependencies(self):
149     """Test AutoTransientTable.Dependencies()"""
150     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151     "political.dbf"))
152     table = AutoTransientTable(self.transientdb, orig_table)
153     self.assertEquals(table.Dependencies(), (orig_table,))
154    
155 bh 998 def test_auto_transient_table_title(self):
156     """Test AutoTransientTable.Title()"""
157     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158     "political.dbf"))
159     table = AutoTransientTable(self.transientdb, orig_table)
160     # The title is of course the same as that of the original table
161     self.assertEquals(table.Title(), orig_table.Title())
162    
163 bh 765 def test_transient_joined_table(self):
164     """Test TransientJoinedTable"""
165 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 bh 765 ("code", FIELDTYPE_INT)],
167     [("OTHER/UNKNOWN", 0),
168     ("RUINS", 1),
169     ("FARM", 2),
170     ("BUILDING", 3),
171     ("HUT", 4),
172     ("LIGHTHOUSE", 5)])
173     auto = AutoTransientTable(self.transientdb, simple)
174     filename = os.path.join("..", "Data", "iceland",
175     "cultural_landmark-point.dbf")
176     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177    
178     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179     auto, "type")
180    
181 bh 818 self.assertEquals(table.NumRows(), 34)
182     self.assertEquals(table.NumColumns(), 8)
183 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184     self.assertEquals(table.Column(0).name, 'AREA')
185     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186     self.assertEquals(table.Column(7).name, 'code')
187     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189     # HasColumn
190     self.failUnless(table.HasColumn("AREA"))
191     self.failUnless(table.HasColumn(1))
192     # HasColumn for non-exisiting columns
193     self.failIf(table.HasColumn("non_existing_name"))
194     self.failIf(table.HasColumn(100))
195 bh 765
196 bh 849 # Reading rows and values
197 bh 839 self.assertEquals(table.ReadRowAsDict(22),
198 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201     'code': 1, 'type': 'RUINS'})
202 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203     self.assertEquals(table.ReadValue(22, 7), 1)
204 bh 765
205     # The transient_table method should return the table itself
206     self.assert_(table is table.transient_table())
207    
208 bh 984 # The TransientJoinedTable depends on both input tables
209     self.assertEquals(table.Dependencies(), (landmarks, auto))
210 bh 765
211 bh 998 # The title is constructed from the titles of the input tables.
212     self.assertEquals(table.Title(),
213     "Join of %s and %s" % (landmarks.Title(),
214     auto.Title()))
215 bh 984
216 bh 998
217 bh 1328 def test_transient_joined_table_same_column_name(self):
218     """Test TransientJoinedTable join on columns with same name
219    
220     The transient DB maps the column names used by the tables to
221     another set of names used only inside the SQLite database. There
222     was a bug in the way this mapping was used when joining on
223     fields with the same names in both tables so that the joined
224     table ended up joining on the same column in the same table.
225     """
226     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227     [(i,) for i in range(4)])
228     stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231     ("stretch_id", FIELDTYPE_INT)],
232     [(1, 0), (2, 3)])
233     discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236     discharges, "stretch_id",
237     outer_join = True)
238    
239     self.assertEquals(table.NumRows(), 4)
240     self.assertEquals(table.NumColumns(), 2)
241    
242     # HasColumn
243     self.failUnless(table.HasColumn("stretch_id"))
244     self.failUnless(table.HasColumn("disch_id"))
245    
246    
247 bh 785 def test_transient_table_read_twice(self):
248 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
249 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
250 bh 785 ("code", FIELDTYPE_INT)],
251     [("OTHER/UNKNOWN", 0),
252     ("RUINS", 1),
253     ("FARM", 2),
254     ("BUILDING", 3),
255     ("HUT", 4),
256     ("LIGHTHOUSE", 5)])
257     table = TransientTable(self.transientdb, simple)
258 bh 765
259 bh 785 # There was a bug where reading the same record twice would
260     # raise an exception in the second call because of an
261     # unitialized local variable, so for passing the test it's
262     # enough if reading simply succeeds. OTOH, while we're at it we
263     # might as well check whether the results are equal anyway :)
264 bh 839 result1 = table.ReadRowAsDict(3)
265     result2 = table.ReadRowAsDict(3)
266 bh 785 self.assertEquals(result1, result2)
267    
268 bh 818
269 bh 842 def test_transient_table_query(self):
270     """Test TransientTable.SimpleQuery()"""
271     simple = MemoryTable([("type", FIELDTYPE_STRING),
272     ("value", FIELDTYPE_DOUBLE),
273     ("code", FIELDTYPE_INT)],
274     [("OTHER/UNKNOWN", -1.5, 11),
275     ("RUINS", 0.0, 1),
276     ("FARM", 3.141, 2),
277     ("BUILDING", 2.5, 3),
278     ("HUT", 1e6, 4),
279     ("LIGHTHOUSE", -0.01, 5)])
280     table = TransientTable(self.transientdb, simple)
281    
282     # A column and a value
283     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
284     [1])
285     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
286     [0, 1, 3, 4, 5])
287     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
288     [0, 1, 5])
289     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
290     [0])
291     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
292     [0, 4, 5])
293     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
294     [0, 3, 4, 5])
295    
296     # Two columns as operands
297     self.assertEquals(table.SimpleQuery(table.Column(1),
298     "<=", table.Column(2)),
299     [0, 1, 3, 5])
300    
301     # Test whether invalid operators raise a ValueError
302     self.assertRaises(ValueError,
303     table.SimpleQuery,
304     table.Column(1), "<<", table.Column(2))
305    
306    
307 bh 765 if __name__ == "__main__":
308     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26