/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1328 - (show annotations)
Tue Jul 1 12:01:58 2003 UTC (21 years, 8 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 13264 byte(s)
* test/test_transientdb.py
(TestTransientTable.test_transient_joined_table_same_column_name):
New. Test whether joining on columns with the same names in both
tables works.

* Thuban/Model/transientdb.py (TransientJoinedTable.create): Make
sure to use the right internal names even when joining on field
with the same names in both tables. Also, detect duplicate names
in the joined table correctly.

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25 TransientJoinedTable, AutoTransientTable
26
27
28 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29
30 def setUp(self):
31 """Create a transient database as self.transientdb"""
32 filename = self.temp_file_name("transient_table.sqlite")
33 if os.path.exists(filename):
34 os.remove(filename)
35 journal = filename + "-journal"
36 if os.path.exists(journal):
37 print "removing journal", journal
38 os.remove(journal)
39 self.transientdb = TransientDatabase(filename)
40
41 def tearDown(self):
42 self.transientdb.close()
43
44 def run_iceland_political_tests(self, table):
45 """Run some tests on tablte
46
47 Assume that table holds the data of the file
48 ../Data/iceland/political.dbf sample file.
49 """
50 self.assertEquals(table.NumRows(), 156)
51 self.assertEquals(table.NumColumns(), 8)
52
53 # Check one each of the possible field types.
54 columns = table.Columns()
55 self.assertEquals(columns[0].name, 'AREA')
56 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 self.assertEquals(columns[3].name, 'PONET_ID')
58 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 self.assertEquals(columns[6].name, 'POPYCOUN')
60 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61
62 # HasColumn
63 self.failUnless(table.HasColumn("AREA"))
64 self.failUnless(table.HasColumn(1))
65 # HasColumn for non-exisiting columns
66 self.failIf(table.HasColumn("non_existing_name"))
67 self.failIf(table.HasColumn(100))
68
69 # Reading rows and values.
70 self.assertEquals(table.ReadRowAsDict(144),
71 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72 'AREA': 19.462,
73 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74 'POPYREG': '1',
75 'PONET_ID': 145})
76 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77 self.assertEquals(table.ReadValue(144, 3), 145)
78
79 # ValueRange may induce a copy to the transient database.
80 # Therefore we put it last so that we can execute this method
81 # twice to check whether the other methods still work after the
82 # copy
83 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84
85 unique = table.UniqueValues("PONET_ID")
86 unique.sort()
87 self.assertEquals(unique, range(1, 157))
88
89 def test_transient_table(self):
90 """Test TransientTable(dbftable)
91
92 The TransientTable should copy the data to the
93 TransientDatabase.
94 """
95 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96 "political.dbf"))
97 table = TransientTable(self.transientdb, orig_table)
98 self.run_iceland_political_tests(table)
99
100 # The transient_table method should return the table itself
101 self.assert_(table is table.transient_table())
102
103 # The title is simply copied over from the original table
104 self.assertEquals(table.Title(), orig_table.Title())
105
106 # The TransientTable class itself doesn't implement the
107 # Dependencies method, so we don't test it.
108
109
110 def test_auto_transient_table(self):
111 """Test AutoTransientTable(dbftable)
112
113 The AutoTransientTable should copy the data to the
114 TransientDatabase on demand.
115 """
116 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117 "political.dbf"))
118 table = AutoTransientTable(self.transientdb, orig_table)
119
120 # Run the tests twice so that we execute them once when the data
121 # has not been copied to the transient db yet and once when it
122 # has. This assumes that run_iceland_political_tests does at
123 # least one call to a method that copies to the transient db at
124 # its end.
125 self.run_iceland_political_tests(table)
126 self.run_iceland_political_tests(table)
127
128 def test_auto_transient_table_query(self):
129 """Test AutoTransientTable.SimpleQuery()"""
130 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131 "political.dbf"))
132 table = AutoTransientTable(self.transientdb, orig_table)
133 # Only a simple test here. The AutoTransientTable simply
134 # delegates to its transient table so it should be OK that the
135 # real test for it is in test_transient_table_query. However,
136 # it's important to check that the column handling works
137 # correctly because the AutoTransientTable and it's underlying
138 # transient table use different column object types.
139 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140 [144])
141
142 # test using a Column object as the right parameter
143 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 "==",
145 table.Column("POPYREG")),
146 range(156))
147
148 def test_auto_transient_table_dependencies(self):
149 """Test AutoTransientTable.Dependencies()"""
150 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151 "political.dbf"))
152 table = AutoTransientTable(self.transientdb, orig_table)
153 self.assertEquals(table.Dependencies(), (orig_table,))
154
155 def test_auto_transient_table_title(self):
156 """Test AutoTransientTable.Title()"""
157 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158 "political.dbf"))
159 table = AutoTransientTable(self.transientdb, orig_table)
160 # The title is of course the same as that of the original table
161 self.assertEquals(table.Title(), orig_table.Title())
162
163 def test_transient_joined_table(self):
164 """Test TransientJoinedTable"""
165 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 ("code", FIELDTYPE_INT)],
167 [("OTHER/UNKNOWN", 0),
168 ("RUINS", 1),
169 ("FARM", 2),
170 ("BUILDING", 3),
171 ("HUT", 4),
172 ("LIGHTHOUSE", 5)])
173 auto = AutoTransientTable(self.transientdb, simple)
174 filename = os.path.join("..", "Data", "iceland",
175 "cultural_landmark-point.dbf")
176 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177
178 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179 auto, "type")
180
181 self.assertEquals(table.NumRows(), 34)
182 self.assertEquals(table.NumColumns(), 8)
183 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184 self.assertEquals(table.Column(0).name, 'AREA')
185 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186 self.assertEquals(table.Column(7).name, 'code')
187 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189 # HasColumn
190 self.failUnless(table.HasColumn("AREA"))
191 self.failUnless(table.HasColumn(1))
192 # HasColumn for non-exisiting columns
193 self.failIf(table.HasColumn("non_existing_name"))
194 self.failIf(table.HasColumn(100))
195
196 # Reading rows and values
197 self.assertEquals(table.ReadRowAsDict(22),
198 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201 'code': 1, 'type': 'RUINS'})
202 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203 self.assertEquals(table.ReadValue(22, 7), 1)
204
205 # The transient_table method should return the table itself
206 self.assert_(table is table.transient_table())
207
208 # The TransientJoinedTable depends on both input tables
209 self.assertEquals(table.Dependencies(), (landmarks, auto))
210
211 # The title is constructed from the titles of the input tables.
212 self.assertEquals(table.Title(),
213 "Join of %s and %s" % (landmarks.Title(),
214 auto.Title()))
215
216
217 def test_transient_joined_table_same_column_name(self):
218 """Test TransientJoinedTable join on columns with same name
219
220 The transient DB maps the column names used by the tables to
221 another set of names used only inside the SQLite database. There
222 was a bug in the way this mapping was used when joining on
223 fields with the same names in both tables so that the joined
224 table ended up joining on the same column in the same table.
225 """
226 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227 [(i,) for i in range(4)])
228 stretches = AutoTransientTable(self.transientdb, mem_stretches)
229
230 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231 ("stretch_id", FIELDTYPE_INT)],
232 [(1, 0), (2, 3)])
233 discharges = AutoTransientTable(self.transientdb, mem_discharges)
234
235 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236 discharges, "stretch_id",
237 outer_join = True)
238
239 self.assertEquals(table.NumRows(), 4)
240 self.assertEquals(table.NumColumns(), 2)
241
242 # HasColumn
243 self.failUnless(table.HasColumn("stretch_id"))
244 self.failUnless(table.HasColumn("disch_id"))
245
246
247 def test_transient_table_read_twice(self):
248 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
249 simple = MemoryTable([("type", FIELDTYPE_STRING),
250 ("code", FIELDTYPE_INT)],
251 [("OTHER/UNKNOWN", 0),
252 ("RUINS", 1),
253 ("FARM", 2),
254 ("BUILDING", 3),
255 ("HUT", 4),
256 ("LIGHTHOUSE", 5)])
257 table = TransientTable(self.transientdb, simple)
258
259 # There was a bug where reading the same record twice would
260 # raise an exception in the second call because of an
261 # unitialized local variable, so for passing the test it's
262 # enough if reading simply succeeds. OTOH, while we're at it we
263 # might as well check whether the results are equal anyway :)
264 result1 = table.ReadRowAsDict(3)
265 result2 = table.ReadRowAsDict(3)
266 self.assertEquals(result1, result2)
267
268
269 def test_transient_table_query(self):
270 """Test TransientTable.SimpleQuery()"""
271 simple = MemoryTable([("type", FIELDTYPE_STRING),
272 ("value", FIELDTYPE_DOUBLE),
273 ("code", FIELDTYPE_INT)],
274 [("OTHER/UNKNOWN", -1.5, 11),
275 ("RUINS", 0.0, 1),
276 ("FARM", 3.141, 2),
277 ("BUILDING", 2.5, 3),
278 ("HUT", 1e6, 4),
279 ("LIGHTHOUSE", -0.01, 5)])
280 table = TransientTable(self.transientdb, simple)
281
282 # A column and a value
283 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
284 [1])
285 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
286 [0, 1, 3, 4, 5])
287 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
288 [0, 1, 5])
289 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
290 [0])
291 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
292 [0, 4, 5])
293 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
294 [0, 3, 4, 5])
295
296 # Two columns as operands
297 self.assertEquals(table.SimpleQuery(table.Column(1),
298 "<=", table.Column(2)),
299 [0, 1, 3, 5])
300
301 # Test whether invalid operators raise a ValueError
302 self.assertRaises(ValueError,
303 table.SimpleQuery,
304 table.Column(1), "<<", table.Column(2))
305
306
307 if __name__ == "__main__":
308 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26