/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1924 - (hide annotations)
Fri Nov 7 12:07:11 2003 UTC (21 years, 4 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 18613 byte(s)
(TestTransientTable.test_auto_transient_table): Make sure that the
data is copied to the transient database at some point.

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 bh 1381 import dbflib
23 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 bh 1381 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26     TransientJoinedTable, AutoTransientTable
27    
28    
29     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30    
31     def setUp(self):
32     """Create a transient database as self.transientdb"""
33     filename = self.temp_file_name("transient_table.sqlite")
34     if os.path.exists(filename):
35     os.remove(filename)
36     journal = filename + "-journal"
37     if os.path.exists(journal):
38     print "removing journal", journal
39     os.remove(journal)
40     self.transientdb = TransientDatabase(filename)
41    
42     def tearDown(self):
43     self.transientdb.close()
44    
45     def run_iceland_political_tests(self, table):
46     """Run some tests on tablte
47    
48     Assume that table holds the data of the file
49     ../Data/iceland/political.dbf sample file.
50     """
51 bh 818 self.assertEquals(table.NumRows(), 156)
52     self.assertEquals(table.NumColumns(), 8)
53 bh 765
54 bh 1041 # Check one each of the possible field types.
55 bh 818 columns = table.Columns()
56     self.assertEquals(columns[0].name, 'AREA')
57 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
59 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
61 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62 bh 765
63 bh 839 # HasColumn
64     self.failUnless(table.HasColumn("AREA"))
65     self.failUnless(table.HasColumn(1))
66     # HasColumn for non-exisiting columns
67     self.failIf(table.HasColumn("non_existing_name"))
68     self.failIf(table.HasColumn(100))
69    
70 bh 849 # Reading rows and values.
71 bh 818 self.assertEquals(table.ReadRowAsDict(144),
72 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73     'AREA': 19.462,
74     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75     'POPYREG': '1',
76     'PONET_ID': 145})
77 bh 1662 self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78     {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79     'AREA': 19.462,
80     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81     'POPYREG': '1',
82     'PONET_ID': 145})
83 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84     self.assertEquals(table.ReadValue(144, 3), 145)
85 bh 1662 self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86     19.462)
87     self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88 bh 765
89 bh 1662 self.assertEquals(table.RowIdToOrdinal(23), 23)
90     self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92 bh 839 # ValueRange may induce a copy to the transient database.
93 bh 765 # Therefore we put it last so that we can execute this method
94     # twice to check whether the other methods still work after the
95     # copy
96 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
97 bh 765
98 bh 818 unique = table.UniqueValues("PONET_ID")
99 bh 765 unique.sort()
100     self.assertEquals(unique, range(1, 157))
101    
102     def test_transient_table(self):
103     """Test TransientTable(dbftable)
104    
105     The TransientTable should copy the data to the
106     TransientDatabase.
107     """
108     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
109     "political.dbf"))
110     table = TransientTable(self.transientdb, orig_table)
111     self.run_iceland_political_tests(table)
112    
113     # The transient_table method should return the table itself
114     self.assert_(table is table.transient_table())
115    
116 bh 998 # The title is simply copied over from the original table
117     self.assertEquals(table.Title(), orig_table.Title())
118    
119 bh 984 # The TransientTable class itself doesn't implement the
120     # Dependencies method, so we don't test it.
121 bh 765
122 bh 984
123 bh 765 def test_auto_transient_table(self):
124     """Test AutoTransientTable(dbftable)
125    
126     The AutoTransientTable should copy the data to the
127     TransientDatabase on demand.
128     """
129     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
130     "political.dbf"))
131     table = AutoTransientTable(self.transientdb, orig_table)
132    
133     # Run the tests twice so that we execute them once when the data
134     # has not been copied to the transient db yet and once when it
135     # has. This assumes that run_iceland_political_tests does at
136     # least one call to a method that copies to the transient db at
137     # its end.
138     self.run_iceland_political_tests(table)
139 bh 1924 # At this point the data has probably not been copied to the
140     # transient DB yet, so we force it by calling the
141     # transient_table method.
142     table.transient_table()
143    
144     # Run the tests again.
145 bh 765 self.run_iceland_political_tests(table)
146    
147 bh 845 def test_auto_transient_table_query(self):
148     """Test AutoTransientTable.SimpleQuery()"""
149     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
150     "political.dbf"))
151     table = AutoTransientTable(self.transientdb, orig_table)
152 jonathan 948 # Only a simple test here. The AutoTransientTable simply
153 bh 845 # delegates to its transient table so it should be OK that the
154     # real test for it is in test_transient_table_query. However,
155     # it's important to check that the column handling works
156     # correctly because the AutoTransientTable and it's underlying
157     # transient table use different column object types.
158     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
159     [144])
160 bh 765
161 jonathan 948 # test using a Column object as the right parameter
162 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
163 jonathan 948 "==",
164     table.Column("POPYREG")),
165     range(156))
166    
167 bh 984 def test_auto_transient_table_dependencies(self):
168     """Test AutoTransientTable.Dependencies()"""
169     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
170     "political.dbf"))
171     table = AutoTransientTable(self.transientdb, orig_table)
172     self.assertEquals(table.Dependencies(), (orig_table,))
173    
174 bh 998 def test_auto_transient_table_title(self):
175     """Test AutoTransientTable.Title()"""
176     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
177     "political.dbf"))
178     table = AutoTransientTable(self.transientdb, orig_table)
179     # The title is of course the same as that of the original table
180     self.assertEquals(table.Title(), orig_table.Title())
181    
182 bh 765 def test_transient_joined_table(self):
183     """Test TransientJoinedTable"""
184 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
185 bh 765 ("code", FIELDTYPE_INT)],
186     [("OTHER/UNKNOWN", 0),
187     ("RUINS", 1),
188     ("FARM", 2),
189     ("BUILDING", 3),
190     ("HUT", 4),
191     ("LIGHTHOUSE", 5)])
192     auto = AutoTransientTable(self.transientdb, simple)
193     filename = os.path.join("..", "Data", "iceland",
194     "cultural_landmark-point.dbf")
195     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
196    
197     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
198     auto, "type")
199    
200 bh 818 self.assertEquals(table.NumRows(), 34)
201     self.assertEquals(table.NumColumns(), 8)
202 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
203     self.assertEquals(table.Column(0).name, 'AREA')
204     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
205     self.assertEquals(table.Column(7).name, 'code')
206     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
207     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
208     # HasColumn
209     self.failUnless(table.HasColumn("AREA"))
210     self.failUnless(table.HasColumn(1))
211     # HasColumn for non-exisiting columns
212     self.failIf(table.HasColumn("non_existing_name"))
213     self.failIf(table.HasColumn(100))
214 bh 765
215 bh 849 # Reading rows and values
216 bh 839 self.assertEquals(table.ReadRowAsDict(22),
217 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
218     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
219     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
220     'code': 1, 'type': 'RUINS'})
221 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222     self.assertEquals(table.ReadValue(22, 7), 1)
223 bh 1662 self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224     "RUINS")
225     self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226     self.assertEquals(table.RowIdToOrdinal(23), 23)
227     self.assertEquals(table.RowOrdinalToId(23), 23)
228 bh 765
229     # The transient_table method should return the table itself
230     self.assert_(table is table.transient_table())
231    
232 bh 984 # The TransientJoinedTable depends on both input tables
233     self.assertEquals(table.Dependencies(), (landmarks, auto))
234 bh 765
235 bh 998 # The title is constructed from the titles of the input tables.
236     self.assertEquals(table.Title(),
237     "Join of %s and %s" % (landmarks.Title(),
238     auto.Title()))
239 bh 984
240 bh 998
241 bh 1328 def test_transient_joined_table_same_column_name(self):
242     """Test TransientJoinedTable join on columns with same name
243    
244     The transient DB maps the column names used by the tables to
245     another set of names used only inside the SQLite database. There
246     was a bug in the way this mapping was used when joining on
247     fields with the same names in both tables so that the joined
248     table ended up joining on the same column in the same table.
249     """
250     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
251     [(i,) for i in range(4)])
252     stretches = AutoTransientTable(self.transientdb, mem_stretches)
253    
254     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
255     ("stretch_id", FIELDTYPE_INT)],
256     [(1, 0), (2, 3)])
257     discharges = AutoTransientTable(self.transientdb, mem_discharges)
258    
259     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
260     discharges, "stretch_id",
261     outer_join = True)
262    
263     self.assertEquals(table.NumRows(), 4)
264 bh 1364 self.assertEquals(table.NumColumns(), 3)
265 bh 1328
266     # HasColumn
267     self.failUnless(table.HasColumn("stretch_id"))
268     self.failUnless(table.HasColumn("disch_id"))
269    
270    
271 frank 1332 def test_transient_joined_table_with_equal_column_names(self):
272     """Test TransientJoinedTable join on tables with equal column names
273    
274 bh 1364 If a name collision occurs for the field names, underscores are
275     appended as long as any collision is resolved.
276 frank 1332 """
277     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278     ("name", FIELDTYPE_INT)],
279     [(0, 10), (1, 11), (2, 12), (3, 13) ])
280     stretches = AutoTransientTable(self.transientdb, mem_stretches)
281    
282     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283     ("stretch_id", FIELDTYPE_INT),
284     ("name", FIELDTYPE_INT)],
285     [(1, 0, 1), (2, 3, 2)])
286     discharges = AutoTransientTable(self.transientdb, mem_discharges)
287    
288     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289     discharges, "stretch_id",
290     outer_join = True)
291    
292     self.assertEquals(table.NumRows(), 4)
293 bh 1364 self.assertEquals(table.NumColumns(), 5)
294 frank 1332
295     # HasColumn
296 bh 1364 self.assertEquals([c.name for c in table.Columns()],
297     ["stretch_id", "name", "disch_id", "stretch_id_",
298     "name_"])
299 frank 1332
300 bh 1364 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301     """Test TransientJoinedTable name-collisions do not modifying in place
302 frank 1332
303 bh 1364 The name collision work-around by appending underscores
304     accidentally modified the column objects in place. We do two
305     joins therefore in reverse order to detect this: The first join
306     will lead to a modified name in the column object of the right
307     table which is then used as the left table in the second join so
308     the underscored name will appear before the non-underscored one
309     in the list of column names after the second join.
310     """
311     mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312     ("name", FIELDTYPE_INT)],
313     [(0, 10), (1, 11), (2, 12), (3, 13) ])
314     table1 = AutoTransientTable(self.transientdb, mem1)
315    
316     mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317     ("name", FIELDTYPE_INT)],
318     [(0, 10), (1, 11), (2, 12), (3, 13) ])
319     table2 = AutoTransientTable(self.transientdb, mem2)
320    
321     table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322     table2, "stretch_id", outer_join = True)
323     table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324     table1, "stretch_id", outer_join = True)
325    
326     self.assertEquals([c.name for c in table.Columns()],
327     ["stretch_id", "name", "stretch_id_", "name_"])
328    
329 bh 785 def test_transient_table_read_twice(self):
330 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
331 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
332 bh 785 ("code", FIELDTYPE_INT)],
333     [("OTHER/UNKNOWN", 0),
334     ("RUINS", 1),
335     ("FARM", 2),
336     ("BUILDING", 3),
337     ("HUT", 4),
338     ("LIGHTHOUSE", 5)])
339     table = TransientTable(self.transientdb, simple)
340 bh 765
341 bh 785 # There was a bug where reading the same record twice would
342     # raise an exception in the second call because of an
343     # unitialized local variable, so for passing the test it's
344     # enough if reading simply succeeds. OTOH, while we're at it we
345     # might as well check whether the results are equal anyway :)
346 bh 839 result1 = table.ReadRowAsDict(3)
347     result2 = table.ReadRowAsDict(3)
348 bh 785 self.assertEquals(result1, result2)
349    
350 bh 818
351 bh 842 def test_transient_table_query(self):
352     """Test TransientTable.SimpleQuery()"""
353     simple = MemoryTable([("type", FIELDTYPE_STRING),
354     ("value", FIELDTYPE_DOUBLE),
355     ("code", FIELDTYPE_INT)],
356     [("OTHER/UNKNOWN", -1.5, 11),
357     ("RUINS", 0.0, 1),
358     ("FARM", 3.141, 2),
359     ("BUILDING", 2.5, 3),
360     ("HUT", 1e6, 4),
361     ("LIGHTHOUSE", -0.01, 5)])
362     table = TransientTable(self.transientdb, simple)
363    
364     # A column and a value
365     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
366     [1])
367     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
368     [0, 1, 3, 4, 5])
369     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
370     [0, 1, 5])
371     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
372     [0])
373     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
374     [0, 4, 5])
375     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
376     [0, 3, 4, 5])
377    
378     # Two columns as operands
379     self.assertEquals(table.SimpleQuery(table.Column(1),
380     "<=", table.Column(2)),
381     [0, 1, 3, 5])
382    
383     # Test whether invalid operators raise a ValueError
384     self.assertRaises(ValueError,
385     table.SimpleQuery,
386     table.Column(1), "<<", table.Column(2))
387    
388 bh 1381 def test_transienttable_to_dbf(self):
389     memtable = MemoryTable([("type", FIELDTYPE_STRING),
390     ("value", FIELDTYPE_DOUBLE),
391     ("code", FIELDTYPE_INT)],
392     [("UNKNOWN", 0.0, 0),
393     ("Foo", 0.5, -1),
394     ("Foo", 1.0/256, 100),
395     ("bar", 1e10, 17)])
396     table = TransientTable(self.transientdb, memtable)
397     filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398     table_to_dbf(table, filename)
399 bh 842
400 bh 1381 dbf = dbflib.DBFFile(filename)
401     self.assertEquals(dbf.read_record(2),
402     {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403     self.assertEquals(dbf.field_count(), 3)
404     self.assertEquals(dbf.record_count(), 4)
405     self.assertEquals(dbf.field_info(0),
406     (dbflib.FTString, "type", 7, 0))
407     self.assertEquals(dbf.field_info(1),
408     (dbflib.FTDouble, "value", 24, 12))
409     self.assertEquals(dbf.field_info(2),
410     (dbflib.FTInteger, "code", 3, 0))
411    
412    
413    
414 bh 765 if __name__ == "__main__":
415     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26