/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1332 - (show annotations)
Tue Jul 1 15:40:52 2003 UTC (21 years, 8 months ago) by frank
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 14852 byte(s)
(TestTransientTable.test_transient_joined_table_with_equal_column_names):
	New. Test join of two tables with partly equal column names.

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25 TransientJoinedTable, AutoTransientTable
26
27
28 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29
30 def setUp(self):
31 """Create a transient database as self.transientdb"""
32 filename = self.temp_file_name("transient_table.sqlite")
33 if os.path.exists(filename):
34 os.remove(filename)
35 journal = filename + "-journal"
36 if os.path.exists(journal):
37 print "removing journal", journal
38 os.remove(journal)
39 self.transientdb = TransientDatabase(filename)
40
41 def tearDown(self):
42 self.transientdb.close()
43
44 def run_iceland_political_tests(self, table):
45 """Run some tests on tablte
46
47 Assume that table holds the data of the file
48 ../Data/iceland/political.dbf sample file.
49 """
50 self.assertEquals(table.NumRows(), 156)
51 self.assertEquals(table.NumColumns(), 8)
52
53 # Check one each of the possible field types.
54 columns = table.Columns()
55 self.assertEquals(columns[0].name, 'AREA')
56 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57 self.assertEquals(columns[3].name, 'PONET_ID')
58 self.assertEquals(columns[3].type, FIELDTYPE_INT)
59 self.assertEquals(columns[6].name, 'POPYCOUN')
60 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61
62 # HasColumn
63 self.failUnless(table.HasColumn("AREA"))
64 self.failUnless(table.HasColumn(1))
65 # HasColumn for non-exisiting columns
66 self.failIf(table.HasColumn("non_existing_name"))
67 self.failIf(table.HasColumn(100))
68
69 # Reading rows and values.
70 self.assertEquals(table.ReadRowAsDict(144),
71 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72 'AREA': 19.462,
73 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74 'POPYREG': '1',
75 'PONET_ID': 145})
76 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77 self.assertEquals(table.ReadValue(144, 3), 145)
78
79 # ValueRange may induce a copy to the transient database.
80 # Therefore we put it last so that we can execute this method
81 # twice to check whether the other methods still work after the
82 # copy
83 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
84
85 unique = table.UniqueValues("PONET_ID")
86 unique.sort()
87 self.assertEquals(unique, range(1, 157))
88
89 def test_transient_table(self):
90 """Test TransientTable(dbftable)
91
92 The TransientTable should copy the data to the
93 TransientDatabase.
94 """
95 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
96 "political.dbf"))
97 table = TransientTable(self.transientdb, orig_table)
98 self.run_iceland_political_tests(table)
99
100 # The transient_table method should return the table itself
101 self.assert_(table is table.transient_table())
102
103 # The title is simply copied over from the original table
104 self.assertEquals(table.Title(), orig_table.Title())
105
106 # The TransientTable class itself doesn't implement the
107 # Dependencies method, so we don't test it.
108
109
110 def test_auto_transient_table(self):
111 """Test AutoTransientTable(dbftable)
112
113 The AutoTransientTable should copy the data to the
114 TransientDatabase on demand.
115 """
116 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
117 "political.dbf"))
118 table = AutoTransientTable(self.transientdb, orig_table)
119
120 # Run the tests twice so that we execute them once when the data
121 # has not been copied to the transient db yet and once when it
122 # has. This assumes that run_iceland_political_tests does at
123 # least one call to a method that copies to the transient db at
124 # its end.
125 self.run_iceland_political_tests(table)
126 self.run_iceland_political_tests(table)
127
128 def test_auto_transient_table_query(self):
129 """Test AutoTransientTable.SimpleQuery()"""
130 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131 "political.dbf"))
132 table = AutoTransientTable(self.transientdb, orig_table)
133 # Only a simple test here. The AutoTransientTable simply
134 # delegates to its transient table so it should be OK that the
135 # real test for it is in test_transient_table_query. However,
136 # it's important to check that the column handling works
137 # correctly because the AutoTransientTable and it's underlying
138 # transient table use different column object types.
139 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140 [144])
141
142 # test using a Column object as the right parameter
143 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144 "==",
145 table.Column("POPYREG")),
146 range(156))
147
148 def test_auto_transient_table_dependencies(self):
149 """Test AutoTransientTable.Dependencies()"""
150 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151 "political.dbf"))
152 table = AutoTransientTable(self.transientdb, orig_table)
153 self.assertEquals(table.Dependencies(), (orig_table,))
154
155 def test_auto_transient_table_title(self):
156 """Test AutoTransientTable.Title()"""
157 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158 "political.dbf"))
159 table = AutoTransientTable(self.transientdb, orig_table)
160 # The title is of course the same as that of the original table
161 self.assertEquals(table.Title(), orig_table.Title())
162
163 def test_transient_joined_table(self):
164 """Test TransientJoinedTable"""
165 simple = MemoryTable([("type", FIELDTYPE_STRING),
166 ("code", FIELDTYPE_INT)],
167 [("OTHER/UNKNOWN", 0),
168 ("RUINS", 1),
169 ("FARM", 2),
170 ("BUILDING", 3),
171 ("HUT", 4),
172 ("LIGHTHOUSE", 5)])
173 auto = AutoTransientTable(self.transientdb, simple)
174 filename = os.path.join("..", "Data", "iceland",
175 "cultural_landmark-point.dbf")
176 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
177
178 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179 auto, "type")
180
181 self.assertEquals(table.NumRows(), 34)
182 self.assertEquals(table.NumColumns(), 8)
183 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184 self.assertEquals(table.Column(0).name, 'AREA')
185 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186 self.assertEquals(table.Column(7).name, 'code')
187 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189 # HasColumn
190 self.failUnless(table.HasColumn("AREA"))
191 self.failUnless(table.HasColumn(1))
192 # HasColumn for non-exisiting columns
193 self.failIf(table.HasColumn("non_existing_name"))
194 self.failIf(table.HasColumn(100))
195
196 # Reading rows and values
197 self.assertEquals(table.ReadRowAsDict(22),
198 {'PERIMETER': 0.0, 'CLPOINT_': 23,
199 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201 'code': 1, 'type': 'RUINS'})
202 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203 self.assertEquals(table.ReadValue(22, 7), 1)
204
205 # The transient_table method should return the table itself
206 self.assert_(table is table.transient_table())
207
208 # The TransientJoinedTable depends on both input tables
209 self.assertEquals(table.Dependencies(), (landmarks, auto))
210
211 # The title is constructed from the titles of the input tables.
212 self.assertEquals(table.Title(),
213 "Join of %s and %s" % (landmarks.Title(),
214 auto.Title()))
215
216
217 def test_transient_joined_table_same_column_name(self):
218 """Test TransientJoinedTable join on columns with same name
219
220 The transient DB maps the column names used by the tables to
221 another set of names used only inside the SQLite database. There
222 was a bug in the way this mapping was used when joining on
223 fields with the same names in both tables so that the joined
224 table ended up joining on the same column in the same table.
225 """
226 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227 [(i,) for i in range(4)])
228 stretches = AutoTransientTable(self.transientdb, mem_stretches)
229
230 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231 ("stretch_id", FIELDTYPE_INT)],
232 [(1, 0), (2, 3)])
233 discharges = AutoTransientTable(self.transientdb, mem_discharges)
234
235 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236 discharges, "stretch_id",
237 outer_join = True)
238
239 self.assertEquals(table.NumRows(), 4)
240 self.assertEquals(table.NumColumns(), 2)
241
242 # HasColumn
243 self.failUnless(table.HasColumn("stretch_id"))
244 self.failUnless(table.HasColumn("disch_id"))
245
246
247 def test_transient_joined_table_with_equal_column_names(self):
248 """Test TransientJoinedTable join on tables with equal column names
249
250 The join of two tables contains all fields from both tables instead
251 th field the join was performed on. This special field is included
252 once. If a name collision occurs for the field names, underscores are
253 appended as long as any collision is resolved.
254 """
255 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
256 ("name", FIELDTYPE_INT)],
257 [(0, 10), (1, 11), (2, 12), (3, 13) ])
258 stretches = AutoTransientTable(self.transientdb, mem_stretches)
259
260 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
261 ("stretch_id", FIELDTYPE_INT),
262 ("name", FIELDTYPE_INT)],
263 [(1, 0, 1), (2, 3, 2)])
264 discharges = AutoTransientTable(self.transientdb, mem_discharges)
265
266 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
267 discharges, "stretch_id",
268 outer_join = True)
269
270 self.assertEquals(table.NumRows(), 4)
271 self.assertEquals(table.NumColumns(), 4)
272
273 # HasColumn
274 self.failUnless(table.HasColumn("stretch_id"))
275 self.failUnless(table.HasColumn("disch_id"))
276 self.failUnless(table.HasColumn("name"))
277 self.failUnless(table.HasColumn("name_"))
278
279
280 def test_transient_table_read_twice(self):
281 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
282 simple = MemoryTable([("type", FIELDTYPE_STRING),
283 ("code", FIELDTYPE_INT)],
284 [("OTHER/UNKNOWN", 0),
285 ("RUINS", 1),
286 ("FARM", 2),
287 ("BUILDING", 3),
288 ("HUT", 4),
289 ("LIGHTHOUSE", 5)])
290 table = TransientTable(self.transientdb, simple)
291
292 # There was a bug where reading the same record twice would
293 # raise an exception in the second call because of an
294 # unitialized local variable, so for passing the test it's
295 # enough if reading simply succeeds. OTOH, while we're at it we
296 # might as well check whether the results are equal anyway :)
297 result1 = table.ReadRowAsDict(3)
298 result2 = table.ReadRowAsDict(3)
299 self.assertEquals(result1, result2)
300
301
302 def test_transient_table_query(self):
303 """Test TransientTable.SimpleQuery()"""
304 simple = MemoryTable([("type", FIELDTYPE_STRING),
305 ("value", FIELDTYPE_DOUBLE),
306 ("code", FIELDTYPE_INT)],
307 [("OTHER/UNKNOWN", -1.5, 11),
308 ("RUINS", 0.0, 1),
309 ("FARM", 3.141, 2),
310 ("BUILDING", 2.5, 3),
311 ("HUT", 1e6, 4),
312 ("LIGHTHOUSE", -0.01, 5)])
313 table = TransientTable(self.transientdb, simple)
314
315 # A column and a value
316 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
317 [1])
318 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
319 [0, 1, 3, 4, 5])
320 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
321 [0, 1, 5])
322 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
323 [0])
324 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
325 [0, 4, 5])
326 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
327 [0, 3, 4, 5])
328
329 # Two columns as operands
330 self.assertEquals(table.SimpleQuery(table.Column(1),
331 "<=", table.Column(2)),
332 [0, 1, 3, 5])
333
334 # Test whether invalid operators raise a ValueError
335 self.assertRaises(ValueError,
336 table.SimpleQuery,
337 table.Column(1), "<<", table.Column(2))
338
339
340 if __name__ == "__main__":
341 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26