/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1381 - (hide annotations)
Tue Jul 8 16:37:46 2003 UTC (21 years, 8 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 17439 byte(s)
* Thuban/Model/transientdb.py (TransientTableBase.Width): The type
constants in the column objects are the standard ones defined in
the table module.

* test/test_transientdb.py
(TestTransientTable.test_transienttable_to_dbf): New. Test whether
exporting transient tables as DBF works. This should catch the bug
just fixed in TransientTableBase.Width.

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 bh 1381 import dbflib
23 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 bh 1381 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26     TransientJoinedTable, AutoTransientTable
27    
28    
29     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30    
31     def setUp(self):
32     """Create a transient database as self.transientdb"""
33     filename = self.temp_file_name("transient_table.sqlite")
34     if os.path.exists(filename):
35     os.remove(filename)
36     journal = filename + "-journal"
37     if os.path.exists(journal):
38     print "removing journal", journal
39     os.remove(journal)
40     self.transientdb = TransientDatabase(filename)
41    
42     def tearDown(self):
43     self.transientdb.close()
44    
45     def run_iceland_political_tests(self, table):
46     """Run some tests on tablte
47    
48     Assume that table holds the data of the file
49     ../Data/iceland/political.dbf sample file.
50     """
51 bh 818 self.assertEquals(table.NumRows(), 156)
52     self.assertEquals(table.NumColumns(), 8)
53 bh 765
54 bh 1041 # Check one each of the possible field types.
55 bh 818 columns = table.Columns()
56     self.assertEquals(columns[0].name, 'AREA')
57 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
59 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
61 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62 bh 765
63 bh 839 # HasColumn
64     self.failUnless(table.HasColumn("AREA"))
65     self.failUnless(table.HasColumn(1))
66     # HasColumn for non-exisiting columns
67     self.failIf(table.HasColumn("non_existing_name"))
68     self.failIf(table.HasColumn(100))
69    
70 bh 849 # Reading rows and values.
71 bh 818 self.assertEquals(table.ReadRowAsDict(144),
72 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73     'AREA': 19.462,
74     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75     'POPYREG': '1',
76     'PONET_ID': 145})
77 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
78     self.assertEquals(table.ReadValue(144, 3), 145)
79 bh 765
80 bh 839 # ValueRange may induce a copy to the transient database.
81 bh 765 # Therefore we put it last so that we can execute this method
82     # twice to check whether the other methods still work after the
83     # copy
84 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
85 bh 765
86 bh 818 unique = table.UniqueValues("PONET_ID")
87 bh 765 unique.sort()
88     self.assertEquals(unique, range(1, 157))
89    
90     def test_transient_table(self):
91     """Test TransientTable(dbftable)
92    
93     The TransientTable should copy the data to the
94     TransientDatabase.
95     """
96     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
97     "political.dbf"))
98     table = TransientTable(self.transientdb, orig_table)
99     self.run_iceland_political_tests(table)
100    
101     # The transient_table method should return the table itself
102     self.assert_(table is table.transient_table())
103    
104 bh 998 # The title is simply copied over from the original table
105     self.assertEquals(table.Title(), orig_table.Title())
106    
107 bh 984 # The TransientTable class itself doesn't implement the
108     # Dependencies method, so we don't test it.
109 bh 765
110 bh 984
111 bh 765 def test_auto_transient_table(self):
112     """Test AutoTransientTable(dbftable)
113    
114     The AutoTransientTable should copy the data to the
115     TransientDatabase on demand.
116     """
117     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
118     "political.dbf"))
119     table = AutoTransientTable(self.transientdb, orig_table)
120    
121     # Run the tests twice so that we execute them once when the data
122     # has not been copied to the transient db yet and once when it
123     # has. This assumes that run_iceland_political_tests does at
124     # least one call to a method that copies to the transient db at
125     # its end.
126     self.run_iceland_political_tests(table)
127     self.run_iceland_political_tests(table)
128    
129 bh 845 def test_auto_transient_table_query(self):
130     """Test AutoTransientTable.SimpleQuery()"""
131     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
132     "political.dbf"))
133     table = AutoTransientTable(self.transientdb, orig_table)
134 jonathan 948 # Only a simple test here. The AutoTransientTable simply
135 bh 845 # delegates to its transient table so it should be OK that the
136     # real test for it is in test_transient_table_query. However,
137     # it's important to check that the column handling works
138     # correctly because the AutoTransientTable and it's underlying
139     # transient table use different column object types.
140     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
141     [144])
142 bh 765
143 jonathan 948 # test using a Column object as the right parameter
144 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
145 jonathan 948 "==",
146     table.Column("POPYREG")),
147     range(156))
148    
149 bh 984 def test_auto_transient_table_dependencies(self):
150     """Test AutoTransientTable.Dependencies()"""
151     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
152     "political.dbf"))
153     table = AutoTransientTable(self.transientdb, orig_table)
154     self.assertEquals(table.Dependencies(), (orig_table,))
155    
156 bh 998 def test_auto_transient_table_title(self):
157     """Test AutoTransientTable.Title()"""
158     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
159     "political.dbf"))
160     table = AutoTransientTable(self.transientdb, orig_table)
161     # The title is of course the same as that of the original table
162     self.assertEquals(table.Title(), orig_table.Title())
163    
164 bh 765 def test_transient_joined_table(self):
165     """Test TransientJoinedTable"""
166 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
167 bh 765 ("code", FIELDTYPE_INT)],
168     [("OTHER/UNKNOWN", 0),
169     ("RUINS", 1),
170     ("FARM", 2),
171     ("BUILDING", 3),
172     ("HUT", 4),
173     ("LIGHTHOUSE", 5)])
174     auto = AutoTransientTable(self.transientdb, simple)
175     filename = os.path.join("..", "Data", "iceland",
176     "cultural_landmark-point.dbf")
177     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
178    
179     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
180     auto, "type")
181    
182 bh 818 self.assertEquals(table.NumRows(), 34)
183     self.assertEquals(table.NumColumns(), 8)
184 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
185     self.assertEquals(table.Column(0).name, 'AREA')
186     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
187     self.assertEquals(table.Column(7).name, 'code')
188     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
189     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
190     # HasColumn
191     self.failUnless(table.HasColumn("AREA"))
192     self.failUnless(table.HasColumn(1))
193     # HasColumn for non-exisiting columns
194     self.failIf(table.HasColumn("non_existing_name"))
195     self.failIf(table.HasColumn(100))
196 bh 765
197 bh 849 # Reading rows and values
198 bh 839 self.assertEquals(table.ReadRowAsDict(22),
199 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
200     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
201     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
202     'code': 1, 'type': 'RUINS'})
203 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
204     self.assertEquals(table.ReadValue(22, 7), 1)
205 bh 765
206     # The transient_table method should return the table itself
207     self.assert_(table is table.transient_table())
208    
209 bh 984 # The TransientJoinedTable depends on both input tables
210     self.assertEquals(table.Dependencies(), (landmarks, auto))
211 bh 765
212 bh 998 # The title is constructed from the titles of the input tables.
213     self.assertEquals(table.Title(),
214     "Join of %s and %s" % (landmarks.Title(),
215     auto.Title()))
216 bh 984
217 bh 998
218 bh 1328 def test_transient_joined_table_same_column_name(self):
219     """Test TransientJoinedTable join on columns with same name
220    
221     The transient DB maps the column names used by the tables to
222     another set of names used only inside the SQLite database. There
223     was a bug in the way this mapping was used when joining on
224     fields with the same names in both tables so that the joined
225     table ended up joining on the same column in the same table.
226     """
227     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
228     [(i,) for i in range(4)])
229     stretches = AutoTransientTable(self.transientdb, mem_stretches)
230    
231     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
232     ("stretch_id", FIELDTYPE_INT)],
233     [(1, 0), (2, 3)])
234     discharges = AutoTransientTable(self.transientdb, mem_discharges)
235    
236     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
237     discharges, "stretch_id",
238     outer_join = True)
239    
240     self.assertEquals(table.NumRows(), 4)
241 bh 1364 self.assertEquals(table.NumColumns(), 3)
242 bh 1328
243     # HasColumn
244     self.failUnless(table.HasColumn("stretch_id"))
245     self.failUnless(table.HasColumn("disch_id"))
246    
247    
248 frank 1332 def test_transient_joined_table_with_equal_column_names(self):
249     """Test TransientJoinedTable join on tables with equal column names
250    
251 bh 1364 If a name collision occurs for the field names, underscores are
252     appended as long as any collision is resolved.
253 frank 1332 """
254     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
255     ("name", FIELDTYPE_INT)],
256     [(0, 10), (1, 11), (2, 12), (3, 13) ])
257     stretches = AutoTransientTable(self.transientdb, mem_stretches)
258    
259     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
260     ("stretch_id", FIELDTYPE_INT),
261     ("name", FIELDTYPE_INT)],
262     [(1, 0, 1), (2, 3, 2)])
263     discharges = AutoTransientTable(self.transientdb, mem_discharges)
264    
265     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
266     discharges, "stretch_id",
267     outer_join = True)
268    
269     self.assertEquals(table.NumRows(), 4)
270 bh 1364 self.assertEquals(table.NumColumns(), 5)
271 frank 1332
272     # HasColumn
273 bh 1364 self.assertEquals([c.name for c in table.Columns()],
274     ["stretch_id", "name", "disch_id", "stretch_id_",
275     "name_"])
276 frank 1332
277 bh 1364 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
278     """Test TransientJoinedTable name-collisions do not modifying in place
279 frank 1332
280 bh 1364 The name collision work-around by appending underscores
281     accidentally modified the column objects in place. We do two
282     joins therefore in reverse order to detect this: The first join
283     will lead to a modified name in the column object of the right
284     table which is then used as the left table in the second join so
285     the underscored name will appear before the non-underscored one
286     in the list of column names after the second join.
287     """
288     mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
289     ("name", FIELDTYPE_INT)],
290     [(0, 10), (1, 11), (2, 12), (3, 13) ])
291     table1 = AutoTransientTable(self.transientdb, mem1)
292    
293     mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
294     ("name", FIELDTYPE_INT)],
295     [(0, 10), (1, 11), (2, 12), (3, 13) ])
296     table2 = AutoTransientTable(self.transientdb, mem2)
297    
298     table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
299     table2, "stretch_id", outer_join = True)
300     table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
301     table1, "stretch_id", outer_join = True)
302    
303     self.assertEquals([c.name for c in table.Columns()],
304     ["stretch_id", "name", "stretch_id_", "name_"])
305    
306 bh 785 def test_transient_table_read_twice(self):
307 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
308 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
309 bh 785 ("code", FIELDTYPE_INT)],
310     [("OTHER/UNKNOWN", 0),
311     ("RUINS", 1),
312     ("FARM", 2),
313     ("BUILDING", 3),
314     ("HUT", 4),
315     ("LIGHTHOUSE", 5)])
316     table = TransientTable(self.transientdb, simple)
317 bh 765
318 bh 785 # There was a bug where reading the same record twice would
319     # raise an exception in the second call because of an
320     # unitialized local variable, so for passing the test it's
321     # enough if reading simply succeeds. OTOH, while we're at it we
322     # might as well check whether the results are equal anyway :)
323 bh 839 result1 = table.ReadRowAsDict(3)
324     result2 = table.ReadRowAsDict(3)
325 bh 785 self.assertEquals(result1, result2)
326    
327 bh 818
328 bh 842 def test_transient_table_query(self):
329     """Test TransientTable.SimpleQuery()"""
330     simple = MemoryTable([("type", FIELDTYPE_STRING),
331     ("value", FIELDTYPE_DOUBLE),
332     ("code", FIELDTYPE_INT)],
333     [("OTHER/UNKNOWN", -1.5, 11),
334     ("RUINS", 0.0, 1),
335     ("FARM", 3.141, 2),
336     ("BUILDING", 2.5, 3),
337     ("HUT", 1e6, 4),
338     ("LIGHTHOUSE", -0.01, 5)])
339     table = TransientTable(self.transientdb, simple)
340    
341     # A column and a value
342     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
343     [1])
344     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
345     [0, 1, 3, 4, 5])
346     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
347     [0, 1, 5])
348     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
349     [0])
350     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
351     [0, 4, 5])
352     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
353     [0, 3, 4, 5])
354    
355     # Two columns as operands
356     self.assertEquals(table.SimpleQuery(table.Column(1),
357     "<=", table.Column(2)),
358     [0, 1, 3, 5])
359    
360     # Test whether invalid operators raise a ValueError
361     self.assertRaises(ValueError,
362     table.SimpleQuery,
363     table.Column(1), "<<", table.Column(2))
364    
365 bh 1381 def test_transienttable_to_dbf(self):
366     memtable = MemoryTable([("type", FIELDTYPE_STRING),
367     ("value", FIELDTYPE_DOUBLE),
368     ("code", FIELDTYPE_INT)],
369     [("UNKNOWN", 0.0, 0),
370     ("Foo", 0.5, -1),
371     ("Foo", 1.0/256, 100),
372     ("bar", 1e10, 17)])
373     table = TransientTable(self.transientdb, memtable)
374     filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
375     table_to_dbf(table, filename)
376 bh 842
377 bh 1381 dbf = dbflib.DBFFile(filename)
378     self.assertEquals(dbf.read_record(2),
379     {'code': 100, 'type': 'Foo', 'value': 0.00390625})
380     self.assertEquals(dbf.field_count(), 3)
381     self.assertEquals(dbf.record_count(), 4)
382     self.assertEquals(dbf.field_info(0),
383     (dbflib.FTString, "type", 7, 0))
384     self.assertEquals(dbf.field_info(1),
385     (dbflib.FTDouble, "value", 24, 12))
386     self.assertEquals(dbf.field_info(2),
387     (dbflib.FTInteger, "code", 3, 0))
388    
389    
390    
391 bh 765 if __name__ == "__main__":
392     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26