/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1662 - (show annotations)
Wed Aug 27 13:51:01 2003 UTC (21 years, 6 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 18388 byte(s)
Make the table interface distinguish between row ids (an integer
that uniquely identifies a row) and row ordinals (a simple row
count from 0 to NumRows() - 1)

* Thuban/Model/postgisdb.py (PostGISTable.RowIdToOrdinal)
(PostGISTable.RowOrdinalToId): New methods to conver between row
ids and row ordinals
(PostGISTable.ReadRowAsDict, PostGISTable.ReadValue): New keyword
parameter row_is_ordinal to indicate whether the row parameter is
the row id or the ordinal

* Thuban/Model/transientdb.py (TransientTableBase.RowIdToOrdinal)
(TransientTableBase.RowOrdinalToId)
(AutoTransientTable.RowIdToOrdinal)
(AutoTransientTable.RowOrdinalToId): Same new methods as in
PostGISTable.
(TransientTableBase.ReadRowAsDict, TransientTableBase.ReadValue)
(AutoTransientTable.ReadRowAsDict, AutoTransientTable.ReadValue):
Same new parameter as in PostGISTable.

* Thuban/Model/table.py (DBFTable.RowIdToOrdinal)
(DBFTable.RowOrdinalToId, MemoryTable.RowIdToOrdinal)
(MemoryTable.RowOrdinalToId): Same new methods as in PostGISTable.
(DBFTable.ReadValue, DBFTable.ReadRowAsDict)
(MemoryTable.ReadValue, MemoryTable.ReadRowAsDict): Same new
parameter as in PostGISTable.

* Thuban/UI/tableview.py (DataTable.RowIdToOrdinal)
(DataTable.RowOrdinalToId): New methods to convert between row ids
and row ordinals.
(TableGrid.SelectRowById): New method to select a row based on its
ID as opposed to its ordinal
(DataTable.GetValue, TableGrid.OnRangeSelect)
(TableGrid.OnSelectCell, LayerTableGrid.select_shapes)
(QueryTableFrame.OnQuery, QueryTableFrame.get_selected)
(LayerTableFrame.__init__): Convert between row ids and row
ordinals as appropriate

* test/postgissupport.py (PostGISDatabase.__init__): Add
doc-string.
(PostGISDatabase.initdb): The optional third item in a tuple in
tables is now a (key, value) list with additional arguments to
pass to upload_shapefile
(upload_shapefile): New parameter gid_offset to allow gids that
are not the same as the shapeids in the shapefile
(PostgreSQLServer.get_default_static_data_db): Use the new
gid_offset to make the gids in landmarks 1000 higher than the
shapeids in the shapefile

* test/test_viewport.py
(TestViewportWithPostGIS.test_find_shape_at_point): Adapt to the
new shapeids in the landmarks table

* test/test_transientdb.py
(TestTransientTable.run_iceland_political_tests)
(TestTransientTable.test_transient_joined_table): Add tests for
the new table methods and new keywords arguments.

* test/test_postgis_db.py
(TestPostGISTable.test_read_row_as_dict_row_count_mode)
(TestPostGISTable.test_read_value_row_count_mode)
(TestPostGISTable.test_row_id_to_ordinal)
(TestPostGISTable.test_row_oridnal_to_id): New test for the new
table methods and the new arguments
(TestPostGISShapestorePoint.test_shapes_in_region)
(TestPostGISShapestorePoint.test_shape_raw_data)
(TestPostGISShapestorePoint.test_shape_points)
(TestPostGISShapestorePoint.test_shape_shapeid)
(TestPostGISShapestorePoint.test_all_shapes)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_simple_query)
(TestPostGISTable.test_read_value)
(TestPostGISTable.test_read_row_as_dict): Adapt to the new
shapeids in the landmarks table

* test/test_memory_table.py
(TestMemoryTable.test_read_row_as_dict_row_count_mode)
(TestMemoryTable.test_read_value_row_count_mode)
(TestMemoryTable.test_row_id_to_ordinal)
(TestMemoryTable.test_row_oridnal_to_id): New test for the new
table methods and the new arguments

* test/test_dbf_table.py
(TestDBFTable.test_read_row_as_dict_row_count_mode)
(TestDBFTable.test_read_value_row_count_mode)
(TestDBFTable.test_row_id_to_ordinal)
(TestDBFTable.test_row_oridnal_to_id): New test for the new table
methods and the new arguments

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 import dbflib
23 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26 TransientJoinedTable, AutoTransientTable
27
28
29 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30
31 def setUp(self):
32 """Create a transient database as self.transientdb"""
33 filename = self.temp_file_name("transient_table.sqlite")
34 if os.path.exists(filename):
35 os.remove(filename)
36 journal = filename + "-journal"
37 if os.path.exists(journal):
38 print "removing journal", journal
39 os.remove(journal)
40 self.transientdb = TransientDatabase(filename)
41
42 def tearDown(self):
43 self.transientdb.close()
44
45 def run_iceland_political_tests(self, table):
46 """Run some tests on tablte
47
48 Assume that table holds the data of the file
49 ../Data/iceland/political.dbf sample file.
50 """
51 self.assertEquals(table.NumRows(), 156)
52 self.assertEquals(table.NumColumns(), 8)
53
54 # Check one each of the possible field types.
55 columns = table.Columns()
56 self.assertEquals(columns[0].name, 'AREA')
57 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 self.assertEquals(columns[3].name, 'PONET_ID')
59 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 self.assertEquals(columns[6].name, 'POPYCOUN')
61 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62
63 # HasColumn
64 self.failUnless(table.HasColumn("AREA"))
65 self.failUnless(table.HasColumn(1))
66 # HasColumn for non-exisiting columns
67 self.failIf(table.HasColumn("non_existing_name"))
68 self.failIf(table.HasColumn(100))
69
70 # Reading rows and values.
71 self.assertEquals(table.ReadRowAsDict(144),
72 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73 'AREA': 19.462,
74 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75 'POPYREG': '1',
76 'PONET_ID': 145})
77 self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79 'AREA': 19.462,
80 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81 'POPYREG': '1',
82 'PONET_ID': 145})
83 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84 self.assertEquals(table.ReadValue(144, 3), 145)
85 self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86 19.462)
87 self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88
89 self.assertEquals(table.RowIdToOrdinal(23), 23)
90 self.assertEquals(table.RowOrdinalToId(23), 23)
91
92 # ValueRange may induce a copy to the transient database.
93 # Therefore we put it last so that we can execute this method
94 # twice to check whether the other methods still work after the
95 # copy
96 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
97
98 unique = table.UniqueValues("PONET_ID")
99 unique.sort()
100 self.assertEquals(unique, range(1, 157))
101
102 def test_transient_table(self):
103 """Test TransientTable(dbftable)
104
105 The TransientTable should copy the data to the
106 TransientDatabase.
107 """
108 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
109 "political.dbf"))
110 table = TransientTable(self.transientdb, orig_table)
111 self.run_iceland_political_tests(table)
112
113 # The transient_table method should return the table itself
114 self.assert_(table is table.transient_table())
115
116 # The title is simply copied over from the original table
117 self.assertEquals(table.Title(), orig_table.Title())
118
119 # The TransientTable class itself doesn't implement the
120 # Dependencies method, so we don't test it.
121
122
123 def test_auto_transient_table(self):
124 """Test AutoTransientTable(dbftable)
125
126 The AutoTransientTable should copy the data to the
127 TransientDatabase on demand.
128 """
129 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
130 "political.dbf"))
131 table = AutoTransientTable(self.transientdb, orig_table)
132
133 # Run the tests twice so that we execute them once when the data
134 # has not been copied to the transient db yet and once when it
135 # has. This assumes that run_iceland_political_tests does at
136 # least one call to a method that copies to the transient db at
137 # its end.
138 self.run_iceland_political_tests(table)
139 self.run_iceland_political_tests(table)
140
141 def test_auto_transient_table_query(self):
142 """Test AutoTransientTable.SimpleQuery()"""
143 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
144 "political.dbf"))
145 table = AutoTransientTable(self.transientdb, orig_table)
146 # Only a simple test here. The AutoTransientTable simply
147 # delegates to its transient table so it should be OK that the
148 # real test for it is in test_transient_table_query. However,
149 # it's important to check that the column handling works
150 # correctly because the AutoTransientTable and it's underlying
151 # transient table use different column object types.
152 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
153 [144])
154
155 # test using a Column object as the right parameter
156 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
157 "==",
158 table.Column("POPYREG")),
159 range(156))
160
161 def test_auto_transient_table_dependencies(self):
162 """Test AutoTransientTable.Dependencies()"""
163 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
164 "political.dbf"))
165 table = AutoTransientTable(self.transientdb, orig_table)
166 self.assertEquals(table.Dependencies(), (orig_table,))
167
168 def test_auto_transient_table_title(self):
169 """Test AutoTransientTable.Title()"""
170 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
171 "political.dbf"))
172 table = AutoTransientTable(self.transientdb, orig_table)
173 # The title is of course the same as that of the original table
174 self.assertEquals(table.Title(), orig_table.Title())
175
176 def test_transient_joined_table(self):
177 """Test TransientJoinedTable"""
178 simple = MemoryTable([("type", FIELDTYPE_STRING),
179 ("code", FIELDTYPE_INT)],
180 [("OTHER/UNKNOWN", 0),
181 ("RUINS", 1),
182 ("FARM", 2),
183 ("BUILDING", 3),
184 ("HUT", 4),
185 ("LIGHTHOUSE", 5)])
186 auto = AutoTransientTable(self.transientdb, simple)
187 filename = os.path.join("..", "Data", "iceland",
188 "cultural_landmark-point.dbf")
189 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
190
191 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
192 auto, "type")
193
194 self.assertEquals(table.NumRows(), 34)
195 self.assertEquals(table.NumColumns(), 8)
196 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
197 self.assertEquals(table.Column(0).name, 'AREA')
198 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
199 self.assertEquals(table.Column(7).name, 'code')
200 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
201 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
202 # HasColumn
203 self.failUnless(table.HasColumn("AREA"))
204 self.failUnless(table.HasColumn(1))
205 # HasColumn for non-exisiting columns
206 self.failIf(table.HasColumn("non_existing_name"))
207 self.failIf(table.HasColumn(100))
208
209 # Reading rows and values
210 self.assertEquals(table.ReadRowAsDict(22),
211 {'PERIMETER': 0.0, 'CLPOINT_': 23,
212 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
213 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
214 'code': 1, 'type': 'RUINS'})
215 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216 self.assertEquals(table.ReadValue(22, 7), 1)
217 self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218 "RUINS")
219 self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220 self.assertEquals(table.RowIdToOrdinal(23), 23)
221 self.assertEquals(table.RowOrdinalToId(23), 23)
222
223 # The transient_table method should return the table itself
224 self.assert_(table is table.transient_table())
225
226 # The TransientJoinedTable depends on both input tables
227 self.assertEquals(table.Dependencies(), (landmarks, auto))
228
229 # The title is constructed from the titles of the input tables.
230 self.assertEquals(table.Title(),
231 "Join of %s and %s" % (landmarks.Title(),
232 auto.Title()))
233
234
235 def test_transient_joined_table_same_column_name(self):
236 """Test TransientJoinedTable join on columns with same name
237
238 The transient DB maps the column names used by the tables to
239 another set of names used only inside the SQLite database. There
240 was a bug in the way this mapping was used when joining on
241 fields with the same names in both tables so that the joined
242 table ended up joining on the same column in the same table.
243 """
244 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
245 [(i,) for i in range(4)])
246 stretches = AutoTransientTable(self.transientdb, mem_stretches)
247
248 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
249 ("stretch_id", FIELDTYPE_INT)],
250 [(1, 0), (2, 3)])
251 discharges = AutoTransientTable(self.transientdb, mem_discharges)
252
253 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
254 discharges, "stretch_id",
255 outer_join = True)
256
257 self.assertEquals(table.NumRows(), 4)
258 self.assertEquals(table.NumColumns(), 3)
259
260 # HasColumn
261 self.failUnless(table.HasColumn("stretch_id"))
262 self.failUnless(table.HasColumn("disch_id"))
263
264
265 def test_transient_joined_table_with_equal_column_names(self):
266 """Test TransientJoinedTable join on tables with equal column names
267
268 If a name collision occurs for the field names, underscores are
269 appended as long as any collision is resolved.
270 """
271 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272 ("name", FIELDTYPE_INT)],
273 [(0, 10), (1, 11), (2, 12), (3, 13) ])
274 stretches = AutoTransientTable(self.transientdb, mem_stretches)
275
276 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
277 ("stretch_id", FIELDTYPE_INT),
278 ("name", FIELDTYPE_INT)],
279 [(1, 0, 1), (2, 3, 2)])
280 discharges = AutoTransientTable(self.transientdb, mem_discharges)
281
282 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
283 discharges, "stretch_id",
284 outer_join = True)
285
286 self.assertEquals(table.NumRows(), 4)
287 self.assertEquals(table.NumColumns(), 5)
288
289 # HasColumn
290 self.assertEquals([c.name for c in table.Columns()],
291 ["stretch_id", "name", "disch_id", "stretch_id_",
292 "name_"])
293
294 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295 """Test TransientJoinedTable name-collisions do not modifying in place
296
297 The name collision work-around by appending underscores
298 accidentally modified the column objects in place. We do two
299 joins therefore in reverse order to detect this: The first join
300 will lead to a modified name in the column object of the right
301 table which is then used as the left table in the second join so
302 the underscored name will appear before the non-underscored one
303 in the list of column names after the second join.
304 """
305 mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306 ("name", FIELDTYPE_INT)],
307 [(0, 10), (1, 11), (2, 12), (3, 13) ])
308 table1 = AutoTransientTable(self.transientdb, mem1)
309
310 mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311 ("name", FIELDTYPE_INT)],
312 [(0, 10), (1, 11), (2, 12), (3, 13) ])
313 table2 = AutoTransientTable(self.transientdb, mem2)
314
315 table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316 table2, "stretch_id", outer_join = True)
317 table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318 table1, "stretch_id", outer_join = True)
319
320 self.assertEquals([c.name for c in table.Columns()],
321 ["stretch_id", "name", "stretch_id_", "name_"])
322
323 def test_transient_table_read_twice(self):
324 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
325 simple = MemoryTable([("type", FIELDTYPE_STRING),
326 ("code", FIELDTYPE_INT)],
327 [("OTHER/UNKNOWN", 0),
328 ("RUINS", 1),
329 ("FARM", 2),
330 ("BUILDING", 3),
331 ("HUT", 4),
332 ("LIGHTHOUSE", 5)])
333 table = TransientTable(self.transientdb, simple)
334
335 # There was a bug where reading the same record twice would
336 # raise an exception in the second call because of an
337 # unitialized local variable, so for passing the test it's
338 # enough if reading simply succeeds. OTOH, while we're at it we
339 # might as well check whether the results are equal anyway :)
340 result1 = table.ReadRowAsDict(3)
341 result2 = table.ReadRowAsDict(3)
342 self.assertEquals(result1, result2)
343
344
345 def test_transient_table_query(self):
346 """Test TransientTable.SimpleQuery()"""
347 simple = MemoryTable([("type", FIELDTYPE_STRING),
348 ("value", FIELDTYPE_DOUBLE),
349 ("code", FIELDTYPE_INT)],
350 [("OTHER/UNKNOWN", -1.5, 11),
351 ("RUINS", 0.0, 1),
352 ("FARM", 3.141, 2),
353 ("BUILDING", 2.5, 3),
354 ("HUT", 1e6, 4),
355 ("LIGHTHOUSE", -0.01, 5)])
356 table = TransientTable(self.transientdb, simple)
357
358 # A column and a value
359 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
360 [1])
361 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
362 [0, 1, 3, 4, 5])
363 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
364 [0, 1, 5])
365 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
366 [0])
367 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
368 [0, 4, 5])
369 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
370 [0, 3, 4, 5])
371
372 # Two columns as operands
373 self.assertEquals(table.SimpleQuery(table.Column(1),
374 "<=", table.Column(2)),
375 [0, 1, 3, 5])
376
377 # Test whether invalid operators raise a ValueError
378 self.assertRaises(ValueError,
379 table.SimpleQuery,
380 table.Column(1), "<<", table.Column(2))
381
382 def test_transienttable_to_dbf(self):
383 memtable = MemoryTable([("type", FIELDTYPE_STRING),
384 ("value", FIELDTYPE_DOUBLE),
385 ("code", FIELDTYPE_INT)],
386 [("UNKNOWN", 0.0, 0),
387 ("Foo", 0.5, -1),
388 ("Foo", 1.0/256, 100),
389 ("bar", 1e10, 17)])
390 table = TransientTable(self.transientdb, memtable)
391 filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392 table_to_dbf(table, filename)
393
394 dbf = dbflib.DBFFile(filename)
395 self.assertEquals(dbf.read_record(2),
396 {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397 self.assertEquals(dbf.field_count(), 3)
398 self.assertEquals(dbf.record_count(), 4)
399 self.assertEquals(dbf.field_info(0),
400 (dbflib.FTString, "type", 7, 0))
401 self.assertEquals(dbf.field_info(1),
402 (dbflib.FTDouble, "value", 24, 12))
403 self.assertEquals(dbf.field_info(2),
404 (dbflib.FTInteger, "code", 3, 0))
405
406
407
408 if __name__ == "__main__":
409 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26