ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log

Revision 1924 - (show annotations)
Fri Nov 7 12:07:11 2003 UTC (21 years, 4 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 18613 byte(s)
(TestTransientTable.test_auto_transient_table): Make sure that the
data is copied to the transient database at some point.

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
8 """
9 Test the Transient DB classes
10 """
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
16 import os
17 import unittest
19 import support
20 support.initthuban()
22 import dbflib
23 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
25 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26 TransientJoinedTable, AutoTransientTable
29 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
31 def setUp(self):
32 """Create a transient database as self.transientdb"""
33 filename = self.temp_file_name("transient_table.sqlite")
34 if os.path.exists(filename):
35 os.remove(filename)
36 journal = filename + "-journal"
37 if os.path.exists(journal):
38 print "removing journal", journal
39 os.remove(journal)
40 self.transientdb = TransientDatabase(filename)
42 def tearDown(self):
43 self.transientdb.close()
45 def run_iceland_political_tests(self, table):
46 """Run some tests on tablte
48 Assume that table holds the data of the file
49 ../Data/iceland/political.dbf sample file.
50 """
51 self.assertEquals(table.NumRows(), 156)
52 self.assertEquals(table.NumColumns(), 8)
54 # Check one each of the possible field types.
55 columns = table.Columns()
56 self.assertEquals(columns[0].name, 'AREA')
57 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 self.assertEquals(columns[3].name, 'PONET_ID')
59 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 self.assertEquals(columns[6].name, 'POPYCOUN')
61 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
63 # HasColumn
64 self.failUnless(table.HasColumn("AREA"))
65 self.failUnless(table.HasColumn(1))
66 # HasColumn for non-exisiting columns
67 self.failIf(table.HasColumn("non_existing_name"))
68 self.failIf(table.HasColumn(100))
70 # Reading rows and values.
71 self.assertEquals(table.ReadRowAsDict(144),
72 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73 'AREA': 19.462,
74 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75 'POPYREG': '1',
76 'PONET_ID': 145})
77 self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79 'AREA': 19.462,
80 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81 'POPYREG': '1',
82 'PONET_ID': 145})
83 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84 self.assertEquals(table.ReadValue(144, 3), 145)
85 self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86 19.462)
87 self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
89 self.assertEquals(table.RowIdToOrdinal(23), 23)
90 self.assertEquals(table.RowOrdinalToId(23), 23)
92 # ValueRange may induce a copy to the transient database.
93 # Therefore we put it last so that we can execute this method
94 # twice to check whether the other methods still work after the
95 # copy
96 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
98 unique = table.UniqueValues("PONET_ID")
99 unique.sort()
100 self.assertEquals(unique, range(1, 157))
102 def test_transient_table(self):
103 """Test TransientTable(dbftable)
105 The TransientTable should copy the data to the
106 TransientDatabase.
107 """
108 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
109 "political.dbf"))
110 table = TransientTable(self.transientdb, orig_table)
111 self.run_iceland_political_tests(table)
113 # The transient_table method should return the table itself
114 self.assert_(table is table.transient_table())
116 # The title is simply copied over from the original table
117 self.assertEquals(table.Title(), orig_table.Title())
119 # The TransientTable class itself doesn't implement the
120 # Dependencies method, so we don't test it.
123 def test_auto_transient_table(self):
124 """Test AutoTransientTable(dbftable)
126 The AutoTransientTable should copy the data to the
127 TransientDatabase on demand.
128 """
129 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
130 "political.dbf"))
131 table = AutoTransientTable(self.transientdb, orig_table)
133 # Run the tests twice so that we execute them once when the data
134 # has not been copied to the transient db yet and once when it
135 # has. This assumes that run_iceland_political_tests does at
136 # least one call to a method that copies to the transient db at
137 # its end.
138 self.run_iceland_political_tests(table)
139 # At this point the data has probably not been copied to the
140 # transient DB yet, so we force it by calling the
141 # transient_table method.
142 table.transient_table()
144 # Run the tests again.
145 self.run_iceland_political_tests(table)
147 def test_auto_transient_table_query(self):
148 """Test AutoTransientTable.SimpleQuery()"""
149 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
150 "political.dbf"))
151 table = AutoTransientTable(self.transientdb, orig_table)
152 # Only a simple test here. The AutoTransientTable simply
153 # delegates to its transient table so it should be OK that the
154 # real test for it is in test_transient_table_query. However,
155 # it's important to check that the column handling works
156 # correctly because the AutoTransientTable and it's underlying
157 # transient table use different column object types.
158 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
159 [144])
161 # test using a Column object as the right parameter
162 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
163 "==",
164 table.Column("POPYREG")),
165 range(156))
167 def test_auto_transient_table_dependencies(self):
168 """Test AutoTransientTable.Dependencies()"""
169 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
170 "political.dbf"))
171 table = AutoTransientTable(self.transientdb, orig_table)
172 self.assertEquals(table.Dependencies(), (orig_table,))
174 def test_auto_transient_table_title(self):
175 """Test AutoTransientTable.Title()"""
176 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
177 "political.dbf"))
178 table = AutoTransientTable(self.transientdb, orig_table)
179 # The title is of course the same as that of the original table
180 self.assertEquals(table.Title(), orig_table.Title())
182 def test_transient_joined_table(self):
183 """Test TransientJoinedTable"""
184 simple = MemoryTable([("type", FIELDTYPE_STRING),
185 ("code", FIELDTYPE_INT)],
186 [("OTHER/UNKNOWN", 0),
187 ("RUINS", 1),
188 ("FARM", 2),
189 ("BUILDING", 3),
190 ("HUT", 4),
191 ("LIGHTHOUSE", 5)])
192 auto = AutoTransientTable(self.transientdb, simple)
193 filename = os.path.join("..", "Data", "iceland",
194 "cultural_landmark-point.dbf")
195 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
197 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
198 auto, "type")
200 self.assertEquals(table.NumRows(), 34)
201 self.assertEquals(table.NumColumns(), 8)
202 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
203 self.assertEquals(table.Column(0).name, 'AREA')
204 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
205 self.assertEquals(table.Column(7).name, 'code')
206 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
207 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
208 # HasColumn
209 self.failUnless(table.HasColumn("AREA"))
210 self.failUnless(table.HasColumn(1))
211 # HasColumn for non-exisiting columns
212 self.failIf(table.HasColumn("non_existing_name"))
213 self.failIf(table.HasColumn(100))
215 # Reading rows and values
216 self.assertEquals(table.ReadRowAsDict(22),
217 {'PERIMETER': 0.0, 'CLPOINT_': 23,
218 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
219 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
220 'code': 1, 'type': 'RUINS'})
221 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222 self.assertEquals(table.ReadValue(22, 7), 1)
223 self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224 "RUINS")
225 self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226 self.assertEquals(table.RowIdToOrdinal(23), 23)
227 self.assertEquals(table.RowOrdinalToId(23), 23)
229 # The transient_table method should return the table itself
230 self.assert_(table is table.transient_table())
232 # The TransientJoinedTable depends on both input tables
233 self.assertEquals(table.Dependencies(), (landmarks, auto))
235 # The title is constructed from the titles of the input tables.
236 self.assertEquals(table.Title(),
237 "Join of %s and %s" % (landmarks.Title(),
238 auto.Title()))
241 def test_transient_joined_table_same_column_name(self):
242 """Test TransientJoinedTable join on columns with same name
244 The transient DB maps the column names used by the tables to
245 another set of names used only inside the SQLite database. There
246 was a bug in the way this mapping was used when joining on
247 fields with the same names in both tables so that the joined
248 table ended up joining on the same column in the same table.
249 """
250 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
251 [(i,) for i in range(4)])
252 stretches = AutoTransientTable(self.transientdb, mem_stretches)
254 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
255 ("stretch_id", FIELDTYPE_INT)],
256 [(1, 0), (2, 3)])
257 discharges = AutoTransientTable(self.transientdb, mem_discharges)
259 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
260 discharges, "stretch_id",
261 outer_join = True)
263 self.assertEquals(table.NumRows(), 4)
264 self.assertEquals(table.NumColumns(), 3)
266 # HasColumn
267 self.failUnless(table.HasColumn("stretch_id"))
268 self.failUnless(table.HasColumn("disch_id"))
271 def test_transient_joined_table_with_equal_column_names(self):
272 """Test TransientJoinedTable join on tables with equal column names
274 If a name collision occurs for the field names, underscores are
275 appended as long as any collision is resolved.
276 """
277 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278 ("name", FIELDTYPE_INT)],
279 [(0, 10), (1, 11), (2, 12), (3, 13) ])
280 stretches = AutoTransientTable(self.transientdb, mem_stretches)
282 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283 ("stretch_id", FIELDTYPE_INT),
284 ("name", FIELDTYPE_INT)],
285 [(1, 0, 1), (2, 3, 2)])
286 discharges = AutoTransientTable(self.transientdb, mem_discharges)
288 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289 discharges, "stretch_id",
290 outer_join = True)
292 self.assertEquals(table.NumRows(), 4)
293 self.assertEquals(table.NumColumns(), 5)
295 # HasColumn
296 self.assertEquals([c.name for c in table.Columns()],
297 ["stretch_id", "name", "disch_id", "stretch_id_",
298 "name_"])
300 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301 """Test TransientJoinedTable name-collisions do not modifying in place
303 The name collision work-around by appending underscores
304 accidentally modified the column objects in place. We do two
305 joins therefore in reverse order to detect this: The first join
306 will lead to a modified name in the column object of the right
307 table which is then used as the left table in the second join so
308 the underscored name will appear before the non-underscored one
309 in the list of column names after the second join.
310 """
311 mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312 ("name", FIELDTYPE_INT)],
313 [(0, 10), (1, 11), (2, 12), (3, 13) ])
314 table1 = AutoTransientTable(self.transientdb, mem1)
316 mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317 ("name", FIELDTYPE_INT)],
318 [(0, 10), (1, 11), (2, 12), (3, 13) ])
319 table2 = AutoTransientTable(self.transientdb, mem2)
321 table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322 table2, "stretch_id", outer_join = True)
323 table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324 table1, "stretch_id", outer_join = True)
326 self.assertEquals([c.name for c in table.Columns()],
327 ["stretch_id", "name", "stretch_id_", "name_"])
329 def test_transient_table_read_twice(self):
330 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
331 simple = MemoryTable([("type", FIELDTYPE_STRING),
332 ("code", FIELDTYPE_INT)],
333 [("OTHER/UNKNOWN", 0),
334 ("RUINS", 1),
335 ("FARM", 2),
336 ("BUILDING", 3),
337 ("HUT", 4),
338 ("LIGHTHOUSE", 5)])
339 table = TransientTable(self.transientdb, simple)
341 # There was a bug where reading the same record twice would
342 # raise an exception in the second call because of an
343 # unitialized local variable, so for passing the test it's
344 # enough if reading simply succeeds. OTOH, while we're at it we
345 # might as well check whether the results are equal anyway :)
346 result1 = table.ReadRowAsDict(3)
347 result2 = table.ReadRowAsDict(3)
348 self.assertEquals(result1, result2)
351 def test_transient_table_query(self):
352 """Test TransientTable.SimpleQuery()"""
353 simple = MemoryTable([("type", FIELDTYPE_STRING),
354 ("value", FIELDTYPE_DOUBLE),
355 ("code", FIELDTYPE_INT)],
356 [("OTHER/UNKNOWN", -1.5, 11),
357 ("RUINS", 0.0, 1),
358 ("FARM", 3.141, 2),
359 ("BUILDING", 2.5, 3),
360 ("HUT", 1e6, 4),
361 ("LIGHTHOUSE", -0.01, 5)])
362 table = TransientTable(self.transientdb, simple)
364 # A column and a value
365 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
366 [1])
367 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
368 [0, 1, 3, 4, 5])
369 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
370 [0, 1, 5])
371 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
372 [0])
373 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
374 [0, 4, 5])
375 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
376 [0, 3, 4, 5])
378 # Two columns as operands
379 self.assertEquals(table.SimpleQuery(table.Column(1),
380 "<=", table.Column(2)),
381 [0, 1, 3, 5])
383 # Test whether invalid operators raise a ValueError
384 self.assertRaises(ValueError,
385 table.SimpleQuery,
386 table.Column(1), "<<", table.Column(2))
388 def test_transienttable_to_dbf(self):
389 memtable = MemoryTable([("type", FIELDTYPE_STRING),
390 ("value", FIELDTYPE_DOUBLE),
391 ("code", FIELDTYPE_INT)],
392 [("UNKNOWN", 0.0, 0),
393 ("Foo", 0.5, -1),
394 ("Foo", 1.0/256, 100),
395 ("bar", 1e10, 17)])
396 table = TransientTable(self.transientdb, memtable)
397 filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398 table_to_dbf(table, filename)
400 dbf = dbflib.DBFFile(filename)
401 self.assertEquals(dbf.read_record(2),
402 {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403 self.assertEquals(dbf.field_count(), 3)
404 self.assertEquals(dbf.record_count(), 4)
405 self.assertEquals(dbf.field_info(0),
406 (dbflib.FTString, "type", 7, 0))
407 self.assertEquals(dbf.field_info(1),
408 (dbflib.FTDouble, "value", 24, 12))
409 self.assertEquals(dbf.field_info(2),
410 (dbflib.FTInteger, "code", 3, 0))
414 if __name__ == "__main__":
415 support.run_tests()


Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26