/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Contents of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1924 - (show annotations)
Fri Nov 7 12:07:11 2003 UTC (21 years, 4 months ago) by bh
Original Path: trunk/thuban/test/test_transientdb.py
File MIME type: text/x-python
File size: 18613 byte(s)
(TestTransientTable.test_auto_transient_table): Make sure that the
data is copied to the transient database at some point.

1 # Copyright (c) 2002, 2003 by Intevation GmbH
2 # Authors:
3 # Bernhard Herzog <[email protected]>
4 #
5 # This program is free software under the GPL (>=v2)
6 # Read the file COPYING coming with Thuban for details.
7
8 """
9 Test the Transient DB classes
10 """
11
12 __version__ = "$Revision$"
13 # $Source$
14 # $Id$
15
16 import os
17 import unittest
18
19 import support
20 support.initthuban()
21
22 import dbflib
23 from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25 from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26 TransientJoinedTable, AutoTransientTable
27
28
29 class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30
31 def setUp(self):
32 """Create a transient database as self.transientdb"""
33 filename = self.temp_file_name("transient_table.sqlite")
34 if os.path.exists(filename):
35 os.remove(filename)
36 journal = filename + "-journal"
37 if os.path.exists(journal):
38 print "removing journal", journal
39 os.remove(journal)
40 self.transientdb = TransientDatabase(filename)
41
42 def tearDown(self):
43 self.transientdb.close()
44
45 def run_iceland_political_tests(self, table):
46 """Run some tests on tablte
47
48 Assume that table holds the data of the file
49 ../Data/iceland/political.dbf sample file.
50 """
51 self.assertEquals(table.NumRows(), 156)
52 self.assertEquals(table.NumColumns(), 8)
53
54 # Check one each of the possible field types.
55 columns = table.Columns()
56 self.assertEquals(columns[0].name, 'AREA')
57 self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58 self.assertEquals(columns[3].name, 'PONET_ID')
59 self.assertEquals(columns[3].type, FIELDTYPE_INT)
60 self.assertEquals(columns[6].name, 'POPYCOUN')
61 self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62
63 # HasColumn
64 self.failUnless(table.HasColumn("AREA"))
65 self.failUnless(table.HasColumn(1))
66 # HasColumn for non-exisiting columns
67 self.failIf(table.HasColumn("non_existing_name"))
68 self.failIf(table.HasColumn(100))
69
70 # Reading rows and values.
71 self.assertEquals(table.ReadRowAsDict(144),
72 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73 'AREA': 19.462,
74 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75 'POPYREG': '1',
76 'PONET_ID': 145})
77 self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78 {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79 'AREA': 19.462,
80 'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81 'POPYREG': '1',
82 'PONET_ID': 145})
83 self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84 self.assertEquals(table.ReadValue(144, 3), 145)
85 self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86 19.462)
87 self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88
89 self.assertEquals(table.RowIdToOrdinal(23), 23)
90 self.assertEquals(table.RowOrdinalToId(23), 23)
91
92 # ValueRange may induce a copy to the transient database.
93 # Therefore we put it last so that we can execute this method
94 # twice to check whether the other methods still work after the
95 # copy
96 self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
97
98 unique = table.UniqueValues("PONET_ID")
99 unique.sort()
100 self.assertEquals(unique, range(1, 157))
101
102 def test_transient_table(self):
103 """Test TransientTable(dbftable)
104
105 The TransientTable should copy the data to the
106 TransientDatabase.
107 """
108 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
109 "political.dbf"))
110 table = TransientTable(self.transientdb, orig_table)
111 self.run_iceland_political_tests(table)
112
113 # The transient_table method should return the table itself
114 self.assert_(table is table.transient_table())
115
116 # The title is simply copied over from the original table
117 self.assertEquals(table.Title(), orig_table.Title())
118
119 # The TransientTable class itself doesn't implement the
120 # Dependencies method, so we don't test it.
121
122
123 def test_auto_transient_table(self):
124 """Test AutoTransientTable(dbftable)
125
126 The AutoTransientTable should copy the data to the
127 TransientDatabase on demand.
128 """
129 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
130 "political.dbf"))
131 table = AutoTransientTable(self.transientdb, orig_table)
132
133 # Run the tests twice so that we execute them once when the data
134 # has not been copied to the transient db yet and once when it
135 # has. This assumes that run_iceland_political_tests does at
136 # least one call to a method that copies to the transient db at
137 # its end.
138 self.run_iceland_political_tests(table)
139 # At this point the data has probably not been copied to the
140 # transient DB yet, so we force it by calling the
141 # transient_table method.
142 table.transient_table()
143
144 # Run the tests again.
145 self.run_iceland_political_tests(table)
146
147 def test_auto_transient_table_query(self):
148 """Test AutoTransientTable.SimpleQuery()"""
149 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
150 "political.dbf"))
151 table = AutoTransientTable(self.transientdb, orig_table)
152 # Only a simple test here. The AutoTransientTable simply
153 # delegates to its transient table so it should be OK that the
154 # real test for it is in test_transient_table_query. However,
155 # it's important to check that the column handling works
156 # correctly because the AutoTransientTable and it's underlying
157 # transient table use different column object types.
158 self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
159 [144])
160
161 # test using a Column object as the right parameter
162 self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
163 "==",
164 table.Column("POPYREG")),
165 range(156))
166
167 def test_auto_transient_table_dependencies(self):
168 """Test AutoTransientTable.Dependencies()"""
169 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
170 "political.dbf"))
171 table = AutoTransientTable(self.transientdb, orig_table)
172 self.assertEquals(table.Dependencies(), (orig_table,))
173
174 def test_auto_transient_table_title(self):
175 """Test AutoTransientTable.Title()"""
176 orig_table = DBFTable(os.path.join("..", "Data", "iceland",
177 "political.dbf"))
178 table = AutoTransientTable(self.transientdb, orig_table)
179 # The title is of course the same as that of the original table
180 self.assertEquals(table.Title(), orig_table.Title())
181
182 def test_transient_joined_table(self):
183 """Test TransientJoinedTable"""
184 simple = MemoryTable([("type", FIELDTYPE_STRING),
185 ("code", FIELDTYPE_INT)],
186 [("OTHER/UNKNOWN", 0),
187 ("RUINS", 1),
188 ("FARM", 2),
189 ("BUILDING", 3),
190 ("HUT", 4),
191 ("LIGHTHOUSE", 5)])
192 auto = AutoTransientTable(self.transientdb, simple)
193 filename = os.path.join("..", "Data", "iceland",
194 "cultural_landmark-point.dbf")
195 landmarks = AutoTransientTable(self.transientdb, DBFTable(filename))
196
197 table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
198 auto, "type")
199
200 self.assertEquals(table.NumRows(), 34)
201 self.assertEquals(table.NumColumns(), 8)
202 self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
203 self.assertEquals(table.Column(0).name, 'AREA')
204 self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
205 self.assertEquals(table.Column(7).name, 'code')
206 self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
207 self.assertEquals(table.Column(4).name, 'CLPTLABEL')
208 # HasColumn
209 self.failUnless(table.HasColumn("AREA"))
210 self.failUnless(table.HasColumn(1))
211 # HasColumn for non-exisiting columns
212 self.failIf(table.HasColumn("non_existing_name"))
213 self.failIf(table.HasColumn(100))
214
215 # Reading rows and values
216 self.assertEquals(table.ReadRowAsDict(22),
217 {'PERIMETER': 0.0, 'CLPOINT_': 23,
218 'AREA': 0.0, 'CLPTLABEL': 'RUINS',
219 'CLPOINT_ID': 38, 'CLPTFLAG': 0,
220 'code': 1, 'type': 'RUINS'})
221 self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222 self.assertEquals(table.ReadValue(22, 7), 1)
223 self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224 "RUINS")
225 self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226 self.assertEquals(table.RowIdToOrdinal(23), 23)
227 self.assertEquals(table.RowOrdinalToId(23), 23)
228
229 # The transient_table method should return the table itself
230 self.assert_(table is table.transient_table())
231
232 # The TransientJoinedTable depends on both input tables
233 self.assertEquals(table.Dependencies(), (landmarks, auto))
234
235 # The title is constructed from the titles of the input tables.
236 self.assertEquals(table.Title(),
237 "Join of %s and %s" % (landmarks.Title(),
238 auto.Title()))
239
240
241 def test_transient_joined_table_same_column_name(self):
242 """Test TransientJoinedTable join on columns with same name
243
244 The transient DB maps the column names used by the tables to
245 another set of names used only inside the SQLite database. There
246 was a bug in the way this mapping was used when joining on
247 fields with the same names in both tables so that the joined
248 table ended up joining on the same column in the same table.
249 """
250 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
251 [(i,) for i in range(4)])
252 stretches = AutoTransientTable(self.transientdb, mem_stretches)
253
254 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
255 ("stretch_id", FIELDTYPE_INT)],
256 [(1, 0), (2, 3)])
257 discharges = AutoTransientTable(self.transientdb, mem_discharges)
258
259 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
260 discharges, "stretch_id",
261 outer_join = True)
262
263 self.assertEquals(table.NumRows(), 4)
264 self.assertEquals(table.NumColumns(), 3)
265
266 # HasColumn
267 self.failUnless(table.HasColumn("stretch_id"))
268 self.failUnless(table.HasColumn("disch_id"))
269
270
271 def test_transient_joined_table_with_equal_column_names(self):
272 """Test TransientJoinedTable join on tables with equal column names
273
274 If a name collision occurs for the field names, underscores are
275 appended as long as any collision is resolved.
276 """
277 mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278 ("name", FIELDTYPE_INT)],
279 [(0, 10), (1, 11), (2, 12), (3, 13) ])
280 stretches = AutoTransientTable(self.transientdb, mem_stretches)
281
282 mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283 ("stretch_id", FIELDTYPE_INT),
284 ("name", FIELDTYPE_INT)],
285 [(1, 0, 1), (2, 3, 2)])
286 discharges = AutoTransientTable(self.transientdb, mem_discharges)
287
288 table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289 discharges, "stretch_id",
290 outer_join = True)
291
292 self.assertEquals(table.NumRows(), 4)
293 self.assertEquals(table.NumColumns(), 5)
294
295 # HasColumn
296 self.assertEquals([c.name for c in table.Columns()],
297 ["stretch_id", "name", "disch_id", "stretch_id_",
298 "name_"])
299
300 def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301 """Test TransientJoinedTable name-collisions do not modifying in place
302
303 The name collision work-around by appending underscores
304 accidentally modified the column objects in place. We do two
305 joins therefore in reverse order to detect this: The first join
306 will lead to a modified name in the column object of the right
307 table which is then used as the left table in the second join so
308 the underscored name will appear before the non-underscored one
309 in the list of column names after the second join.
310 """
311 mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312 ("name", FIELDTYPE_INT)],
313 [(0, 10), (1, 11), (2, 12), (3, 13) ])
314 table1 = AutoTransientTable(self.transientdb, mem1)
315
316 mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317 ("name", FIELDTYPE_INT)],
318 [(0, 10), (1, 11), (2, 12), (3, 13) ])
319 table2 = AutoTransientTable(self.transientdb, mem2)
320
321 table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322 table2, "stretch_id", outer_join = True)
323 table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324 table1, "stretch_id", outer_join = True)
325
326 self.assertEquals([c.name for c in table.Columns()],
327 ["stretch_id", "name", "stretch_id_", "name_"])
328
329 def test_transient_table_read_twice(self):
330 """Test TransientTable.ReadRowAsDict() reading the same record twice"""
331 simple = MemoryTable([("type", FIELDTYPE_STRING),
332 ("code", FIELDTYPE_INT)],
333 [("OTHER/UNKNOWN", 0),
334 ("RUINS", 1),
335 ("FARM", 2),
336 ("BUILDING", 3),
337 ("HUT", 4),
338 ("LIGHTHOUSE", 5)])
339 table = TransientTable(self.transientdb, simple)
340
341 # There was a bug where reading the same record twice would
342 # raise an exception in the second call because of an
343 # unitialized local variable, so for passing the test it's
344 # enough if reading simply succeeds. OTOH, while we're at it we
345 # might as well check whether the results are equal anyway :)
346 result1 = table.ReadRowAsDict(3)
347 result2 = table.ReadRowAsDict(3)
348 self.assertEquals(result1, result2)
349
350
351 def test_transient_table_query(self):
352 """Test TransientTable.SimpleQuery()"""
353 simple = MemoryTable([("type", FIELDTYPE_STRING),
354 ("value", FIELDTYPE_DOUBLE),
355 ("code", FIELDTYPE_INT)],
356 [("OTHER/UNKNOWN", -1.5, 11),
357 ("RUINS", 0.0, 1),
358 ("FARM", 3.141, 2),
359 ("BUILDING", 2.5, 3),
360 ("HUT", 1e6, 4),
361 ("LIGHTHOUSE", -0.01, 5)])
362 table = TransientTable(self.transientdb, simple)
363
364 # A column and a value
365 self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
366 [1])
367 self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
368 [0, 1, 3, 4, 5])
369 self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
370 [0, 1, 5])
371 self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
372 [0])
373 self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
374 [0, 4, 5])
375 self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
376 [0, 3, 4, 5])
377
378 # Two columns as operands
379 self.assertEquals(table.SimpleQuery(table.Column(1),
380 "<=", table.Column(2)),
381 [0, 1, 3, 5])
382
383 # Test whether invalid operators raise a ValueError
384 self.assertRaises(ValueError,
385 table.SimpleQuery,
386 table.Column(1), "<<", table.Column(2))
387
388 def test_transienttable_to_dbf(self):
389 memtable = MemoryTable([("type", FIELDTYPE_STRING),
390 ("value", FIELDTYPE_DOUBLE),
391 ("code", FIELDTYPE_INT)],
392 [("UNKNOWN", 0.0, 0),
393 ("Foo", 0.5, -1),
394 ("Foo", 1.0/256, 100),
395 ("bar", 1e10, 17)])
396 table = TransientTable(self.transientdb, memtable)
397 filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398 table_to_dbf(table, filename)
399
400 dbf = dbflib.DBFFile(filename)
401 self.assertEquals(dbf.read_record(2),
402 {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403 self.assertEquals(dbf.field_count(), 3)
404 self.assertEquals(dbf.record_count(), 4)
405 self.assertEquals(dbf.field_info(0),
406 (dbflib.FTString, "type", 7, 0))
407 self.assertEquals(dbf.field_info(1),
408 (dbflib.FTDouble, "value", 24, 12))
409 self.assertEquals(dbf.field_info(2),
410 (dbflib.FTInteger, "code", 3, 0))
411
412
413
414 if __name__ == "__main__":
415 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26