/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1381 - (show annotations)
Tue Jul 8 16:37:46 2003 UTC (21 years, 8 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 17439 byte(s)
* Thuban/Model/transientdb.py (TransientTableBase.Width): The type
constants in the column objects are the standard ones defined in
the table module.

* test/test_transientdb.py
(TestTransientTable.test_transienttable_to_dbf): New. Test whether
exporting transient tables as DBF works. This should catch the bug
just fixed in TransientTableBase.Width.

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 import dbflib
23 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26 TransientJoinedTable, AutoTransientTable
27
28
29 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30
31 def setUp(self):
32 """Create a transient database as self.transientdb"""
33 filename = self.temp_file_name("transient_table.sqlite")
34 if os.path.exists(filename):
35 os.remove(filename)
36 journal = filename + "-journal"
37 if os.path.exists(journal):
38 print "removing journal", journal
39 os.remove(journal)
40 self.transientdb = TransientDatabase(filename)
41
42 def tearDown(self):
43 self.transientdb.close()
44
45 def run_iceland_political_tests(self, table):
46 """Run some tests on tablte
47
48 Assume that table holds the data of the file
49 ../Data/iceland/political.dbf sample file.
50 """
51 self.assertEquals(table.NumRows(), 156)
52 self.assertEquals(table.NumColumns(), 8)
53
54 # Check one each of the possible field types.
55 columns = table.Columns()
56 self.assertEquals(columns[0].name, 'AREA')
57 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 self.assertEquals(columns[3].name, 'PONET_ID')
59 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 self.assertEquals(columns[6].name, 'POPYCOUN')
61 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62
63 # HasColumn
64 self.failUnless(table.HasColumn("AREA"))
65 self.failUnless(table.HasColumn(1))
66 # HasColumn for non-exisiting columns
67 self.failIf(table.HasColumn("non_existing_name"))
68 self.failIf(table.HasColumn(100))
69
70 # Reading rows and values.
71 self.assertEquals(table.ReadRowAsDict(144),
72 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73 'AREA': 19.462,
74 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75 'POPYREG': '1',
76 'PONET_ID': 145})
77 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
78 self.assertEquals(table.ReadValue(144, 3), 145)
79
80 # ValueRange may induce a copy to the transient database.
81 # Therefore we put it last so that we can execute this method
82 # twice to check whether the other methods still work after the
83 # copy
84 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
85
86 unique = table.UniqueValues("PONET_ID")
87 unique.sort()
88 self.assertEquals(unique, range(1, 157))
89
90 def test_transient_table(self):
91 """Test TransientTable(dbftable)
92
93 The TransientTable should copy the data to the
94 TransientDatabase.
95 """
96 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
97 "political.dbf"))
98 table = TransientTable(self.transientdb, orig_table)
99 self.run_iceland_political_tests(table)
100
101 # The transient_table method should return the table itself
102 self.assert_(table is table.transient_table())
103
104 # The title is simply copied over from the original table
105 self.assertEquals(table.Title(), orig_table.Title())
106
107 # The TransientTable class itself doesn't implement the
108 # Dependencies method, so we don't test it.
109
110
111 def test_auto_transient_table(self):
112 """Test AutoTransientTable(dbftable)
113
114 The AutoTransientTable should copy the data to the
115 TransientDatabase on demand.
116 """
117 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
118 "political.dbf"))
119 table = AutoTransientTable(self.transientdb, orig_table)
120
121 # Run the tests twice so that we execute them once when the data
122 # has not been copied to the transient db yet and once when it
123 # has. This assumes that run_iceland_political_tests does at
124 # least one call to a method that copies to the transient db at
125 # its end.
126 self.run_iceland_political_tests(table)
127 self.run_iceland_political_tests(table)
128
129 def test_auto_transient_table_query(self):
130 """Test AutoTransientTable.SimpleQuery()"""
131 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
132 "political.dbf"))
133 table = AutoTransientTable(self.transientdb, orig_table)
134 # Only a simple test here. The AutoTransientTable simply
135 # delegates to its transient table so it should be OK that the
136 # real test for it is in test_transient_table_query. However,
137 # it's important to check that the column handling works
138 # correctly because the AutoTransientTable and it's underlying
139 # transient table use different column object types.
140 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
141 [144])
142
143 # test using a Column object as the right parameter
144 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
145 "==",
146 table.Column("POPYREG")),
147 range(156))
148
149 def test_auto_transient_table_dependencies(self):
150 """Test AutoTransientTable.Dependencies()"""
151 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
152 "political.dbf"))
153 table = AutoTransientTable(self.transientdb, orig_table)
154 self.assertEquals(table.Dependencies(), (orig_table,))
155
156 def test_auto_transient_table_title(self):
157 """Test AutoTransientTable.Title()"""
158 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
159 "political.dbf"))
160 table = AutoTransientTable(self.transientdb, orig_table)
161 # The title is of course the same as that of the original table
162 self.assertEquals(table.Title(), orig_table.Title())
163
164 def test_transient_joined_table(self):
165 """Test TransientJoinedTable"""
166 simple = MemoryTable([("type", FIELDTYPE_STRING),
167 ("code", FIELDTYPE_INT)],
168 [("OTHER/UNKNOWN", 0),
169 ("RUINS", 1),
170 ("FARM", 2),
171 ("BUILDING", 3),
172 ("HUT", 4),
173 ("LIGHTHOUSE", 5)])
174 auto = AutoTransientTable(self.transientdb, simple)
175 filename = os.path.join("..", "Data", "iceland",
176 "cultural_landmark-point.dbf")
177 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
178
179 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
180 auto, "type")
181
182 self.assertEquals(table.NumRows(), 34)
183 self.assertEquals(table.NumColumns(), 8)
184 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
185 self.assertEquals(table.Column(0).name, 'AREA')
186 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
187 self.assertEquals(table.Column(7).name, 'code')
188 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
189 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
190 # HasColumn
191 self.failUnless(table.HasColumn("AREA"))
192 self.failUnless(table.HasColumn(1))
193 # HasColumn for non-exisiting columns
194 self.failIf(table.HasColumn("non_existing_name"))
195 self.failIf(table.HasColumn(100))
196
197 # Reading rows and values
198 self.assertEquals(table.ReadRowAsDict(22),
199 {'PERIMETER': 0.0, 'CLPOINT_': 23,
200 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
201 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
202 'code': 1, 'type': 'RUINS'})
203 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
204 self.assertEquals(table.ReadValue(22, 7), 1)
205
206 # The transient_table method should return the table itself
207 self.assert_(table is table.transient_table())
208
209 # The TransientJoinedTable depends on both input tables
210 self.assertEquals(table.Dependencies(), (landmarks, auto))
211
212 # The title is constructed from the titles of the input tables.
213 self.assertEquals(table.Title(),
214 "Join of %s and %s" % (landmarks.Title(),
215 auto.Title()))
216
217
218 def test_transient_joined_table_same_column_name(self):
219 """Test TransientJoinedTable join on columns with same name
220
221 The transient DB maps the column names used by the tables to
222 another set of names used only inside the SQLite database. There
223 was a bug in the way this mapping was used when joining on
224 fields with the same names in both tables so that the joined
225 table ended up joining on the same column in the same table.
226 """
227 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
228 [(i,) for i in range(4)])
229 stretches = AutoTransientTable(self.transientdb, mem_stretches)
230
231 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
232 ("stretch_id", FIELDTYPE_INT)],
233 [(1, 0), (2, 3)])
234 discharges = AutoTransientTable(self.transientdb, mem_discharges)
235
236 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
237 discharges, "stretch_id",
238 outer_join = True)
239
240 self.assertEquals(table.NumRows(), 4)
241 self.assertEquals(table.NumColumns(), 3)
242
243 # HasColumn
244 self.failUnless(table.HasColumn("stretch_id"))
245 self.failUnless(table.HasColumn("disch_id"))
246
247
248 def test_transient_joined_table_with_equal_column_names(self):
249 """Test TransientJoinedTable join on tables with equal column names
250
251 If a name collision occurs for the field names, underscores are
252 appended as long as any collision is resolved.
253 """
254 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
255 ("name", FIELDTYPE_INT)],
256 [(0, 10), (1, 11), (2, 12), (3, 13) ])
257 stretches = AutoTransientTable(self.transientdb, mem_stretches)
258
259 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
260 ("stretch_id", FIELDTYPE_INT),
261 ("name", FIELDTYPE_INT)],
262 [(1, 0, 1), (2, 3, 2)])
263 discharges = AutoTransientTable(self.transientdb, mem_discharges)
264
265 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
266 discharges, "stretch_id",
267 outer_join = True)
268
269 self.assertEquals(table.NumRows(), 4)
270 self.assertEquals(table.NumColumns(), 5)
271
272 # HasColumn
273 self.assertEquals([c.name for c in table.Columns()],
274 ["stretch_id", "name", "disch_id", "stretch_id_",
275 "name_"])
276
277 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
278 """Test TransientJoinedTable name-collisions do not modifying in place
279
280 The name collision work-around by appending underscores
281 accidentally modified the column objects in place. We do two
282 joins therefore in reverse order to detect this: The first join
283 will lead to a modified name in the column object of the right
284 table which is then used as the left table in the second join so
285 the underscored name will appear before the non-underscored one
286 in the list of column names after the second join.
287 """
288 mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
289 ("name", FIELDTYPE_INT)],
290 [(0, 10), (1, 11), (2, 12), (3, 13) ])
291 table1 = AutoTransientTable(self.transientdb, mem1)
292
293 mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
294 ("name", FIELDTYPE_INT)],
295 [(0, 10), (1, 11), (2, 12), (3, 13) ])
296 table2 = AutoTransientTable(self.transientdb, mem2)
297
298 table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
299 table2, "stretch_id", outer_join = True)
300 table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
301 table1, "stretch_id", outer_join = True)
302
303 self.assertEquals([c.name for c in table.Columns()],
304 ["stretch_id", "name", "stretch_id_", "name_"])
305
306 def test_transient_table_read_twice(self):
307 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
308 simple = MemoryTable([("type", FIELDTYPE_STRING),
309 ("code", FIELDTYPE_INT)],
310 [("OTHER/UNKNOWN", 0),
311 ("RUINS", 1),
312 ("FARM", 2),
313 ("BUILDING", 3),
314 ("HUT", 4),
315 ("LIGHTHOUSE", 5)])
316 table = TransientTable(self.transientdb, simple)
317
318 # There was a bug where reading the same record twice would
319 # raise an exception in the second call because of an
320 # unitialized local variable, so for passing the test it's
321 # enough if reading simply succeeds. OTOH, while we're at it we
322 # might as well check whether the results are equal anyway :)
323 result1 = table.ReadRowAsDict(3)
324 result2 = table.ReadRowAsDict(3)
325 self.assertEquals(result1, result2)
326
327
328 def test_transient_table_query(self):
329 """Test TransientTable.SimpleQuery()"""
330 simple = MemoryTable([("type", FIELDTYPE_STRING),
331 ("value", FIELDTYPE_DOUBLE),
332 ("code", FIELDTYPE_INT)],
333 [("OTHER/UNKNOWN", -1.5, 11),
334 ("RUINS", 0.0, 1),
335 ("FARM", 3.141, 2),
336 ("BUILDING", 2.5, 3),
337 ("HUT", 1e6, 4),
338 ("LIGHTHOUSE", -0.01, 5)])
339 table = TransientTable(self.transientdb, simple)
340
341 # A column and a value
342 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
343 [1])
344 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
345 [0, 1, 3, 4, 5])
346 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
347 [0, 1, 5])
348 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
349 [0])
350 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
351 [0, 4, 5])
352 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
353 [0, 3, 4, 5])
354
355 # Two columns as operands
356 self.assertEquals(table.SimpleQuery(table.Column(1),
357 "<=", table.Column(2)),
358 [0, 1, 3, 5])
359
360 # Test whether invalid operators raise a ValueError
361 self.assertRaises(ValueError,
362 table.SimpleQuery,
363 table.Column(1), "<<", table.Column(2))
364
365 def test_transienttable_to_dbf(self):
366 memtable = MemoryTable([("type", FIELDTYPE_STRING),
367 ("value", FIELDTYPE_DOUBLE),
368 ("code", FIELDTYPE_INT)],
369 [("UNKNOWN", 0.0, 0),
370 ("Foo", 0.5, -1),
371 ("Foo", 1.0/256, 100),
372 ("bar", 1e10, 17)])
373 table = TransientTable(self.transientdb, memtable)
374 filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
375 table_to_dbf(table, filename)
376
377 dbf = dbflib.DBFFile(filename)
378 self.assertEquals(dbf.read_record(2),
379 {'code': 100, 'type': 'Foo', 'value': 0.00390625})
380 self.assertEquals(dbf.field_count(), 3)
381 self.assertEquals(dbf.record_count(), 4)
382 self.assertEquals(dbf.field_info(0),
383 (dbflib.FTString, "type", 7, 0))
384 self.assertEquals(dbf.field_info(1),
385 (dbflib.FTDouble, "value", 24, 12))
386 self.assertEquals(dbf.field_info(2),
387 (dbflib.FTInteger, "code", 3, 0))
388
389
390
391 if __name__ == "__main__":
392 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26