/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1662 - (hide annotations)
Wed Aug 27 13:51:01 2003 UTC (21 years, 6 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 18388 byte(s)
Make the table interface distinguish between row ids (an integer
that uniquely identifies a row) and row ordinals (a simple row
count from 0 to NumRows() - 1)

* Thuban/Model/postgisdb.py (PostGISTable.RowIdToOrdinal)
(PostGISTable.RowOrdinalToId): New methods to conver between row
ids and row ordinals
(PostGISTable.ReadRowAsDict, PostGISTable.ReadValue): New keyword
parameter row_is_ordinal to indicate whether the row parameter is
the row id or the ordinal

* Thuban/Model/transientdb.py (TransientTableBase.RowIdToOrdinal)
(TransientTableBase.RowOrdinalToId)
(AutoTransientTable.RowIdToOrdinal)
(AutoTransientTable.RowOrdinalToId): Same new methods as in
PostGISTable.
(TransientTableBase.ReadRowAsDict, TransientTableBase.ReadValue)
(AutoTransientTable.ReadRowAsDict, AutoTransientTable.ReadValue):
Same new parameter as in PostGISTable.

* Thuban/Model/table.py (DBFTable.RowIdToOrdinal)
(DBFTable.RowOrdinalToId, MemoryTable.RowIdToOrdinal)
(MemoryTable.RowOrdinalToId): Same new methods as in PostGISTable.
(DBFTable.ReadValue, DBFTable.ReadRowAsDict)
(MemoryTable.ReadValue, MemoryTable.ReadRowAsDict): Same new
parameter as in PostGISTable.

* Thuban/UI/tableview.py (DataTable.RowIdToOrdinal)
(DataTable.RowOrdinalToId): New methods to convert between row ids
and row ordinals.
(TableGrid.SelectRowById): New method to select a row based on its
ID as opposed to its ordinal
(DataTable.GetValue, TableGrid.OnRangeSelect)
(TableGrid.OnSelectCell, LayerTableGrid.select_shapes)
(QueryTableFrame.OnQuery, QueryTableFrame.get_selected)
(LayerTableFrame.__init__): Convert between row ids and row
ordinals as appropriate

* test/postgissupport.py (PostGISDatabase.__init__): Add
doc-string.
(PostGISDatabase.initdb): The optional third item in a tuple in
tables is now a (key, value) list with additional arguments to
pass to upload_shapefile
(upload_shapefile): New parameter gid_offset to allow gids that
are not the same as the shapeids in the shapefile
(PostgreSQLServer.get_default_static_data_db): Use the new
gid_offset to make the gids in landmarks 1000 higher than the
shapeids in the shapefile

* test/test_viewport.py
(TestViewportWithPostGIS.test_find_shape_at_point): Adapt to the
new shapeids in the landmarks table

* test/test_transientdb.py
(TestTransientTable.run_iceland_political_tests)
(TestTransientTable.test_transient_joined_table): Add tests for
the new table methods and new keywords arguments.

* test/test_postgis_db.py
(TestPostGISTable.test_read_row_as_dict_row_count_mode)
(TestPostGISTable.test_read_value_row_count_mode)
(TestPostGISTable.test_row_id_to_ordinal)
(TestPostGISTable.test_row_oridnal_to_id): New test for the new
table methods and the new arguments
(TestPostGISShapestorePoint.test_shapes_in_region)
(TestPostGISShapestorePoint.test_shape_raw_data)
(TestPostGISShapestorePoint.test_shape_points)
(TestPostGISShapestorePoint.test_shape_shapeid)
(TestPostGISShapestorePoint.test_all_shapes)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_read_value)
(TestPostGISTable.test_read_row_as_dict): Adapt to the new
shapeids in the landmarks table

* test/test_memory_table.py
(TestMemoryTable.test_read_row_as_dict_row_count_mode)
(TestMemoryTable.test_read_value_row_count_mode)
(TestMemoryTable.test_row_id_to_ordinal)
(TestMemoryTable.test_row_oridnal_to_id): New test for the new
table methods and the new arguments

* test/test_dbf_table.py
(TestDBFTable.test_read_row_as_dict_row_count_mode)
(TestDBFTable.test_read_value_row_count_mode)
(TestDBFTable.test_row_id_to_ordinal)
(TestDBFTable.test_row_oridnal_to_id): New test for the new table
methods and the new arguments

1 bh 765 # Copyright (c) 2002, 2003 by Intevation GmbH
2     # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Transient DB classes
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16     import os
17     import unittest
18    
19     import support
20     support.initthuban()
21    
22 bh 1381 import dbflib
23 jan 805 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 bh 1381 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 bh 765 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26     TransientJoinedTable, AutoTransientTable
27    
28    
29     class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30    
31     def setUp(self):
32     """Create a transient database as self.transientdb"""
33     filename = self.temp_file_name("transient_table.sqlite")
34     if os.path.exists(filename):
35     os.remove(filename)
36     journal = filename + "-journal"
37     if os.path.exists(journal):
38     print "removing journal", journal
39     os.remove(journal)
40     self.transientdb = TransientDatabase(filename)
41    
42     def tearDown(self):
43     self.transientdb.close()
44    
45     def run_iceland_political_tests(self, table):
46     """Run some tests on tablte
47    
48     Assume that table holds the data of the file
49     ../Data/iceland/political.dbf sample file.
50     """
51 bh 818 self.assertEquals(table.NumRows(), 156)
52     self.assertEquals(table.NumColumns(), 8)
53 bh 765
54 bh 1041 # Check one each of the possible field types.
55 bh 818 columns = table.Columns()
56     self.assertEquals(columns[0].name, 'AREA')
57 bh 839 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 bh 818 self.assertEquals(columns[3].name, 'PONET_ID')
59 bh 839 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 bh 818 self.assertEquals(columns[6].name, 'POPYCOUN')
61 bh 839 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62 bh 765
63 bh 839 # HasColumn
64     self.failUnless(table.HasColumn("AREA"))
65     self.failUnless(table.HasColumn(1))
66     # HasColumn for non-exisiting columns
67     self.failIf(table.HasColumn("non_existing_name"))
68     self.failIf(table.HasColumn(100))
69    
70 bh 849 # Reading rows and values.
71 bh 818 self.assertEquals(table.ReadRowAsDict(144),
72 bh 765 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73     'AREA': 19.462,
74     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75     'POPYREG': '1',
76     'PONET_ID': 145})
77 bh 1662 self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78     {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79     'AREA': 19.462,
80     'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81     'POPYREG': '1',
82     'PONET_ID': 145})
83 bh 849 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84     self.assertEquals(table.ReadValue(144, 3), 145)
85 bh 1662 self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86     19.462)
87     self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88 bh 765
89 bh 1662 self.assertEquals(table.RowIdToOrdinal(23), 23)
90     self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92 bh 839 # ValueRange may induce a copy to the transient database.
93 bh 765 # Therefore we put it last so that we can execute this method
94     # twice to check whether the other methods still work after the
95     # copy
96 bh 818 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
97 bh 765
98 bh 818 unique = table.UniqueValues("PONET_ID")
99 bh 765 unique.sort()
100     self.assertEquals(unique, range(1, 157))
101    
102     def test_transient_table(self):
103     """Test TransientTable(dbftable)
104    
105     The TransientTable should copy the data to the
106     TransientDatabase.
107     """
108     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
109     "political.dbf"))
110     table = TransientTable(self.transientdb, orig_table)
111     self.run_iceland_political_tests(table)
112    
113     # The transient_table method should return the table itself
114     self.assert_(table is table.transient_table())
115    
116 bh 998 # The title is simply copied over from the original table
117     self.assertEquals(table.Title(), orig_table.Title())
118    
119 bh 984 # The TransientTable class itself doesn't implement the
120     # Dependencies method, so we don't test it.
121 bh 765
122 bh 984
123 bh 765 def test_auto_transient_table(self):
124     """Test AutoTransientTable(dbftable)
125    
126     The AutoTransientTable should copy the data to the
127     TransientDatabase on demand.
128     """
129     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
130     "political.dbf"))
131     table = AutoTransientTable(self.transientdb, orig_table)
132    
133     # Run the tests twice so that we execute them once when the data
134     # has not been copied to the transient db yet and once when it
135     # has. This assumes that run_iceland_political_tests does at
136     # least one call to a method that copies to the transient db at
137     # its end.
138     self.run_iceland_political_tests(table)
139     self.run_iceland_political_tests(table)
140    
141 bh 845 def test_auto_transient_table_query(self):
142     """Test AutoTransientTable.SimpleQuery()"""
143     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
144     "political.dbf"))
145     table = AutoTransientTable(self.transientdb, orig_table)
146 jonathan 948 # Only a simple test here. The AutoTransientTable simply
147 bh 845 # delegates to its transient table so it should be OK that the
148     # real test for it is in test_transient_table_query. However,
149     # it's important to check that the column handling works
150     # correctly because the AutoTransientTable and it's underlying
151     # transient table use different column object types.
152     self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
153     [144])
154 bh 765
155 jonathan 948 # test using a Column object as the right parameter
156 bh 984 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
157 jonathan 948 "==",
158     table.Column("POPYREG")),
159     range(156))
160    
161 bh 984 def test_auto_transient_table_dependencies(self):
162     """Test AutoTransientTable.Dependencies()"""
163     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
164     "political.dbf"))
165     table = AutoTransientTable(self.transientdb, orig_table)
166     self.assertEquals(table.Dependencies(), (orig_table,))
167    
168 bh 998 def test_auto_transient_table_title(self):
169     """Test AutoTransientTable.Title()"""
170     orig_table = DBFTable(os.path.join("..", "Data", "iceland",
171     "political.dbf"))
172     table = AutoTransientTable(self.transientdb, orig_table)
173     # The title is of course the same as that of the original table
174     self.assertEquals(table.Title(), orig_table.Title())
175    
176 bh 765 def test_transient_joined_table(self):
177     """Test TransientJoinedTable"""
178 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
179 bh 765 ("code", FIELDTYPE_INT)],
180     [("OTHER/UNKNOWN", 0),
181     ("RUINS", 1),
182     ("FARM", 2),
183     ("BUILDING", 3),
184     ("HUT", 4),
185     ("LIGHTHOUSE", 5)])
186     auto = AutoTransientTable(self.transientdb, simple)
187     filename = os.path.join("..", "Data", "iceland",
188     "cultural_landmark-point.dbf")
189     landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
190    
191     table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
192     auto, "type")
193    
194 bh 818 self.assertEquals(table.NumRows(), 34)
195     self.assertEquals(table.NumColumns(), 8)
196 bh 839 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
197     self.assertEquals(table.Column(0).name, 'AREA')
198     self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
199     self.assertEquals(table.Column(7).name, 'code')
200     self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
201     self.assertEquals(table.Column(4).name, 'CLPTLABEL')
202     # HasColumn
203     self.failUnless(table.HasColumn("AREA"))
204     self.failUnless(table.HasColumn(1))
205     # HasColumn for non-exisiting columns
206     self.failIf(table.HasColumn("non_existing_name"))
207     self.failIf(table.HasColumn(100))
208 bh 765
209 bh 849 # Reading rows and values
210 bh 839 self.assertEquals(table.ReadRowAsDict(22),
211 bh 765 {'PERIMETER': 0.0, 'CLPOINT_': 23,
212     'AREA': 0.0, 'CLPTLABEL': 'RUINS',
213     'CLPOINT_ID': 38, 'CLPTFLAG': 0,
214     'code': 1, 'type': 'RUINS'})
215 bh 849 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216     self.assertEquals(table.ReadValue(22, 7), 1)
217 bh 1662 self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218     "RUINS")
219     self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220     self.assertEquals(table.RowIdToOrdinal(23), 23)
221     self.assertEquals(table.RowOrdinalToId(23), 23)
222 bh 765
223     # The transient_table method should return the table itself
224     self.assert_(table is table.transient_table())
225    
226 bh 984 # The TransientJoinedTable depends on both input tables
227     self.assertEquals(table.Dependencies(), (landmarks, auto))
228 bh 765
229 bh 998 # The title is constructed from the titles of the input tables.
230     self.assertEquals(table.Title(),
231     "Join of %s and %s" % (landmarks.Title(),
232     auto.Title()))
233 bh 984
234 bh 998
235 bh 1328 def test_transient_joined_table_same_column_name(self):
236     """Test TransientJoinedTable join on columns with same name
237    
238     The transient DB maps the column names used by the tables to
239     another set of names used only inside the SQLite database. There
240     was a bug in the way this mapping was used when joining on
241     fields with the same names in both tables so that the joined
242     table ended up joining on the same column in the same table.
243     """
244     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
245     [(i,) for i in range(4)])
246     stretches = AutoTransientTable(self.transientdb, mem_stretches)
247    
248     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
249     ("stretch_id", FIELDTYPE_INT)],
250     [(1, 0), (2, 3)])
251     discharges = AutoTransientTable(self.transientdb, mem_discharges)
252    
253     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
254     discharges, "stretch_id",
255     outer_join = True)
256    
257     self.assertEquals(table.NumRows(), 4)
258 bh 1364 self.assertEquals(table.NumColumns(), 3)
259 bh 1328
260     # HasColumn
261     self.failUnless(table.HasColumn("stretch_id"))
262     self.failUnless(table.HasColumn("disch_id"))
263    
264    
265 frank 1332 def test_transient_joined_table_with_equal_column_names(self):
266     """Test TransientJoinedTable join on tables with equal column names
267    
268 bh 1364 If a name collision occurs for the field names, underscores are
269     appended as long as any collision is resolved.
270 frank 1332 """
271     mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272     ("name", FIELDTYPE_INT)],
273     [(0, 10), (1, 11), (2, 12), (3, 13) ])
274     stretches = AutoTransientTable(self.transientdb, mem_stretches)
275    
276     mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
277     ("stretch_id", FIELDTYPE_INT),
278     ("name", FIELDTYPE_INT)],
279     [(1, 0, 1), (2, 3, 2)])
280     discharges = AutoTransientTable(self.transientdb, mem_discharges)
281    
282     table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
283     discharges, "stretch_id",
284     outer_join = True)
285    
286     self.assertEquals(table.NumRows(), 4)
287 bh 1364 self.assertEquals(table.NumColumns(), 5)
288 frank 1332
289     # HasColumn
290 bh 1364 self.assertEquals([c.name for c in table.Columns()],
291     ["stretch_id", "name", "disch_id", "stretch_id_",
292     "name_"])
293 frank 1332
294 bh 1364 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295     """Test TransientJoinedTable name-collisions do not modifying in place
296 frank 1332
297 bh 1364 The name collision work-around by appending underscores
298     accidentally modified the column objects in place. We do two
299     joins therefore in reverse order to detect this: The first join
300     will lead to a modified name in the column object of the right
301     table which is then used as the left table in the second join so
302     the underscored name will appear before the non-underscored one
303     in the list of column names after the second join.
304     """
305     mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306     ("name", FIELDTYPE_INT)],
307     [(0, 10), (1, 11), (2, 12), (3, 13) ])
308     table1 = AutoTransientTable(self.transientdb, mem1)
309    
310     mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311     ("name", FIELDTYPE_INT)],
312     [(0, 10), (1, 11), (2, 12), (3, 13) ])
313     table2 = AutoTransientTable(self.transientdb, mem2)
314    
315     table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316     table2, "stretch_id", outer_join = True)
317     table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318     table1, "stretch_id", outer_join = True)
319    
320     self.assertEquals([c.name for c in table.Columns()],
321     ["stretch_id", "name", "stretch_id_", "name_"])
322    
323 bh 785 def test_transient_table_read_twice(self):
324 bh 842 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
325 jan 805 simple = MemoryTable([("type", FIELDTYPE_STRING),
326 bh 785 ("code", FIELDTYPE_INT)],
327     [("OTHER/UNKNOWN", 0),
328     ("RUINS", 1),
329     ("FARM", 2),
330     ("BUILDING", 3),
331     ("HUT", 4),
332     ("LIGHTHOUSE", 5)])
333     table = TransientTable(self.transientdb, simple)
334 bh 765
335 bh 785 # There was a bug where reading the same record twice would
336     # raise an exception in the second call because of an
337     # unitialized local variable, so for passing the test it's
338     # enough if reading simply succeeds. OTOH, while we're at it we
339     # might as well check whether the results are equal anyway :)
340 bh 839 result1 = table.ReadRowAsDict(3)
341     result2 = table.ReadRowAsDict(3)
342 bh 785 self.assertEquals(result1, result2)
343    
344 bh 818
345 bh 842 def test_transient_table_query(self):
346     """Test TransientTable.SimpleQuery()"""
347     simple = MemoryTable([("type", FIELDTYPE_STRING),
348     ("value", FIELDTYPE_DOUBLE),
349     ("code", FIELDTYPE_INT)],
350     [("OTHER/UNKNOWN", -1.5, 11),
351     ("RUINS", 0.0, 1),
352     ("FARM", 3.141, 2),
353     ("BUILDING", 2.5, 3),
354     ("HUT", 1e6, 4),
355     ("LIGHTHOUSE", -0.01, 5)])
356     table = TransientTable(self.transientdb, simple)
357    
358     # A column and a value
359     self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
360     [1])
361     self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
362     [0, 1, 3, 4, 5])
363     self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
364     [0, 1, 5])
365     self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
366     [0])
367     self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
368     [0, 4, 5])
369     self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
370     [0, 3, 4, 5])
371    
372     # Two columns as operands
373     self.assertEquals(table.SimpleQuery(table.Column(1),
374     "<=", table.Column(2)),
375     [0, 1, 3, 5])
376    
377     # Test whether invalid operators raise a ValueError
378     self.assertRaises(ValueError,
379     table.SimpleQuery,
380     table.Column(1), "<<", table.Column(2))
381    
382 bh 1381 def test_transienttable_to_dbf(self):
383     memtable = MemoryTable([("type", FIELDTYPE_STRING),
384     ("value", FIELDTYPE_DOUBLE),
385     ("code", FIELDTYPE_INT)],
386     [("UNKNOWN", 0.0, 0),
387     ("Foo", 0.5, -1),
388     ("Foo", 1.0/256, 100),
389     ("bar", 1e10, 17)])
390     table = TransientTable(self.transientdb, memtable)
391     filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392     table_to_dbf(table, filename)
393 bh 842
394 bh 1381 dbf = dbflib.DBFFile(filename)
395     self.assertEquals(dbf.read_record(2),
396     {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397     self.assertEquals(dbf.field_count(), 3)
398     self.assertEquals(dbf.record_count(), 4)
399     self.assertEquals(dbf.field_info(0),
400     (dbflib.FTString, "type", 7, 0))
401     self.assertEquals(dbf.field_info(1),
402     (dbflib.FTDouble, "value", 24, 12))
403     self.assertEquals(dbf.field_info(2),
404     (dbflib.FTInteger, "code", 3, 0))
405    
406    
407    
408 bh 765 if __name__ == "__main__":
409     support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26