/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

trunk/thuban/test/test_transientdb.py revision 765 by bh, Tue Apr 29 12:42:14 2003 UTC branches/WIP-pyshapelib-bramz/test/test_transientdb.py revision 2734 by bramz, Thu Mar 1 12:42:59 2007 UTC
# Line 1  Line 1 
1  # Copyright (c) 2002, 2003 by Intevation GmbH  # Copyright (c) 2002, 2003, 2006 by Intevation GmbH
2  # Authors:  # Authors:
3  # Bernhard Herzog <[email protected]>  # Bernhard Herzog <[email protected]>
4  #  #
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22  from Thuban.Model.table import DBFTable, FIELDTYPE_STRING, FIELDTYPE_INT  import dbflib
23    from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                   FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
28    
 class SimpleTable:  
   
     """Very simple table implementation that operates on a list of tuples"""  
   
     def __init__(self, fields, data):  
         """Initialize the SimpleTable  
   
         Parameters:  
         fields -- List of (name, field_type) pairs  
         data -- List of tuples, one for each row of data  
         """  
         self.fields = fields  
         self.data = data  
   
     def field_count(self):  
         return len(self.fields)  
   
     def field_info(self, index):  
         name, type = self.fields[index]  
         return (type, name, 0, 0)  
   
     def record_count(self):  
         return len(self.data)  
   
     def read_record(self, index):  
         return dict([(self.fields[i][0], self.data[index][i])  
                       for i in range(len(self.fields))])  
   
   
29  class TestTransientTable(unittest.TestCase, support.FileTestMixin):  class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30    
31      def setUp(self):      def setUp(self):
# Line 70  class TestTransientTable(unittest.TestCa Line 43  class TestTransientTable(unittest.TestCa
43          self.transientdb.close()          self.transientdb.close()
44    
45      def run_iceland_political_tests(self, table):      def run_iceland_political_tests(self, table):
46          """Run some tests on tablte          """Run some tests on table
47    
48          Assume that table holds the data of the file          Assume that table holds the data of the file
49          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
50          """          """
51          self.assertEquals(table.record_count(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
55          # decimal precision is always 0.          columns = table.Columns()
56          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(table.field_info(3), ('int', 'PONET_ID', 0, 0))          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58          self.assertEquals(table.field_info(6), ('string', 'POPYCOUN', 0, 0))          self.assertEquals(columns[3].name, 'PONET_ID')
59            self.assertEquals(columns[3].type, FIELDTYPE_INT)
60            self.assertEquals(columns[6].name, 'POPYCOUN')
61            self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62    
63            # HasColumn
64            self.failUnless(table.HasColumn("AREA"))
65            self.failUnless(table.HasColumn(1))
66            # HasColumn for non-exisiting columns
67            self.failIf(table.HasColumn("non_existing_name"))
68            self.failIf(table.HasColumn(100))
69    
70          # Read an `interesting' record          # Reading rows and values.
71          self.assertEquals(table.read_record(144),          self.assertEquals(table.ReadRowAsDict(144),
72                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73                               'AREA': 19.462,
74                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                               'POPYREG': '1',
76                               'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                             'AREA': 19.462,                             'AREA': 19.462,
80                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                             'POPYREG': '1',                             'POPYREG': '1',
82                             'PONET_ID': 145})                             'PONET_ID': 145})
83            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84            self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89          # field_range may induce a copy to the transient database.          self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92            # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
94          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
95          # copy          # copy
96          self.assertEquals(table.field_range("AREA"),          self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
                           ((0.0, None), (19.462, None)))  
97    
98          unique = table.GetUniqueValues("PONET_ID")          unique = table.UniqueValues("PONET_ID")
99          unique.sort()          unique.sort()
100          self.assertEquals(unique, range(1, 157))          self.assertEquals(unique, range(1, 157))
101    
# Line 117  class TestTransientTable(unittest.TestCa Line 113  class TestTransientTable(unittest.TestCa
113          # The transient_table method should return the table itself          # The transient_table method should return the table itself
114          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
115    
116            # The title is simply copied over from the original table
117            self.assertEquals(table.Title(), orig_table.Title())
118    
119            # The TransientTable class itself doesn't implement the
120            # Dependencies method, so we don't test it.
121    
122    
123      def test_auto_transient_table(self):      def test_auto_transient_table(self):
124          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 134  class TestTransientTable(unittest.TestCa Line 136  class TestTransientTable(unittest.TestCa
136          # least one call to a method that copies to the transient db at          # least one call to a method that copies to the transient db at
137          # its end.          # its end.
138          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
139            # At this point the data has probably not been copied to the
140            # transient DB yet, so we force it by calling the
141            # transient_table method.
142            table.transient_table()
143    
144            # Run the tests again.
145          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
146    
147        def test_auto_transient_table_query(self):
148            """Test AutoTransientTable.SimpleQuery()"""
149            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
150                                               "political.dbf"))
151            table = AutoTransientTable(self.transientdb, orig_table)
152            # Only a simple test here. The AutoTransientTable simply
153            # delegates to its transient table so it should be OK that the
154            # real test for it is in test_transient_table_query. However,
155            # it's important to check that the column handling works
156            # correctly because the AutoTransientTable and it's underlying
157            # transient table use different column object types.
158            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
159                              [144])
160    
161            # test using a Column object as the right parameter
162            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
163                                                "==",
164                                                table.Column("POPYREG")),
165                              range(156))
166    
167        def test_auto_transient_table_dependencies(self):
168            """Test AutoTransientTable.Dependencies()"""
169            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
170                                               "political.dbf"))
171            table = AutoTransientTable(self.transientdb, orig_table)
172            self.assertEquals(table.Dependencies(), (orig_table,))
173    
174        def test_auto_transient_table_title(self):
175            """Test AutoTransientTable.Title()"""
176            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
177                                               "political.dbf"))
178            table = AutoTransientTable(self.transientdb, orig_table)
179            # The title is of course the same as that of the original table
180            self.assertEquals(table.Title(), orig_table.Title())
181    
182      def test_transient_joined_table(self):      def test_transient_joined_table(self):
183          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
184          simple = SimpleTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
185                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
186                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
187                                ("RUINS", 1),                                ("RUINS", 1),
# Line 155  class TestTransientTable(unittest.TestCa Line 197  class TestTransientTable(unittest.TestCa
197          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
198                                       auto, "type")                                       auto, "type")
199    
200          self.assertEquals(table.record_count(), 34)          self.assertEquals(table.NumRows(), 34)
201          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
202          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
203          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
204          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
205            self.assertEquals(table.Column(7).name, 'code')
206            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
207            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
208            # HasColumn
209            self.failUnless(table.HasColumn("AREA"))
210            self.failUnless(table.HasColumn(1))
211            # HasColumn for non-exisiting columns
212            self.failIf(table.HasColumn("non_existing_name"))
213            self.failIf(table.HasColumn(100))
214    
215          # Read an `interesting' record          # Reading rows and values
216          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
217                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
218                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
219                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
220                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
221            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222            self.assertEquals(table.ReadValue(22, 7), 1)
223            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224                              "RUINS")
225            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226            self.assertEquals(table.RowIdToOrdinal(23), 23)
227            self.assertEquals(table.RowOrdinalToId(23), 23)
228    
229          # The transient_table method should return the table itself          # The transient_table method should return the table itself
230          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
231    
232            # The TransientJoinedTable depends on both input tables
233            self.assertEquals(table.Dependencies(), (landmarks, auto))
234    
235            # The title is constructed from the titles of the input tables.
236            self.assertEquals(table.Title(),
237                              "Join of %s and %s" % (landmarks.Title(),
238                                                     auto.Title()))
239    
240    
241        def test_transient_joined_table_same_column_name(self):
242            """Test TransientJoinedTable join on columns with same name
243    
244            The transient DB maps the column names used by the tables to
245            another set of names used only inside the SQLite database. There
246            was a bug in the way this mapping was used when joining on
247            fields with the same names in both tables so that the joined
248            table ended up joining on the same column in the same table.
249            """
250            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
251                                        [(i,) for i in range(4)])
252            stretches = AutoTransientTable(self.transientdb, mem_stretches)
253    
254            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
255                                          ("stretch_id", FIELDTYPE_INT)],
256                                         [(1, 0), (2, 3)])
257            discharges = AutoTransientTable(self.transientdb, mem_discharges)
258    
259            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
260                                         discharges, "stretch_id",
261                                         outer_join = True)
262    
263            self.assertEquals(table.NumRows(), 4)
264            self.assertEquals(table.NumColumns(), 3)
265    
266            # HasColumn
267            self.failUnless(table.HasColumn("stretch_id"))
268            self.failUnless(table.HasColumn("disch_id"))
269    
270    
271        def test_transient_joined_table_with_equal_column_names(self):
272            """Test TransientJoinedTable join on tables with equal column names
273    
274            If a name collision occurs for the field names, underscores are
275            appended as long as any collision is resolved.
276            """
277            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278                                         ("name", FIELDTYPE_INT)],
279                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
280            stretches = AutoTransientTable(self.transientdb, mem_stretches)
281    
282            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283                                          ("stretch_id", FIELDTYPE_INT),
284                                          ("name", FIELDTYPE_INT)],
285                                         [(1, 0, 1), (2, 3, 2)])
286            discharges = AutoTransientTable(self.transientdb, mem_discharges)
287    
288            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289                                         discharges, "stretch_id",
290                                         outer_join = True)
291    
292            self.assertEquals(table.NumRows(), 4)
293            self.assertEquals(table.NumColumns(), 5)
294    
295            # HasColumn
296            self.assertEquals([c.name for c in table.Columns()],
297                              ["stretch_id", "name", "disch_id", "stretch_id_",
298                               "name_"])
299    
300        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301            """Test TransientJoinedTable name-collisions do not modifying in place
302    
303            The name collision work-around by appending underscores
304            accidentally modified the column objects in place. We do two
305            joins therefore in reverse order to detect this: The first join
306            will lead to a modified name in the column object of the right
307            table which is then used as the left table in the second join so
308            the underscored name will appear before the non-underscored one
309            in the list of column names after the second join.
310            """
311            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312                                ("name", FIELDTYPE_INT)],
313                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
314            table1 = AutoTransientTable(self.transientdb, mem1)
315    
316            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317                                ("name", FIELDTYPE_INT)],
318                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
319            table2 = AutoTransientTable(self.transientdb, mem2)
320    
321            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322                                         table2, "stretch_id", outer_join = True)
323            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324                                         table1, "stretch_id", outer_join = True)
325    
326            self.assertEquals([c.name for c in table.Columns()],
327                              ["stretch_id", "name", "stretch_id_", "name_"])
328    
329        def test_transient_table_read_twice(self):
330            """Test TransientTable.ReadRowAsDict() reading the same record twice"""
331            simple = MemoryTable([("type", FIELDTYPE_STRING),
332                                  ("code", FIELDTYPE_INT)],
333                                 [("OTHER/UNKNOWN", 0),
334                                  ("RUINS", 1),
335                                  ("FARM", 2),
336                                  ("BUILDING", 3),
337                                  ("HUT", 4),
338                                  ("LIGHTHOUSE", 5)])
339            table = TransientTable(self.transientdb, simple)
340    
341            # There was a bug where reading the same record twice would
342            # raise an exception in the second call because of an
343            # unitialized local variable, so for passing the test it's
344            # enough if reading simply succeeds. OTOH, while we're at it we
345            # might as well check whether the results are equal anyway :)
346            result1 = table.ReadRowAsDict(3)
347            result2 = table.ReadRowAsDict(3)
348            self.assertEquals(result1, result2)
349    
350    
351        def test_transient_table_query(self):
352            """Test TransientTable.SimpleQuery()"""
353            simple = MemoryTable([("type", FIELDTYPE_STRING),
354                                  ("value", FIELDTYPE_DOUBLE),
355                                  ("code", FIELDTYPE_INT)],
356                                 [("OTHER/UNKNOWN", -1.5, 11),
357                                  ("RUINS", 0.0, 1),
358                                  ("FARM", 3.141, 2),
359                                  ("BUILDING", 2.5, 3),
360                                  ("HUT", 1e6, 4),
361                                  ("LIGHTHOUSE", -0.01, 5)])
362            table = TransientTable(self.transientdb, simple)
363    
364            # A column and a value
365            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
366                              [1])
367            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
368                              [0, 1, 3, 4, 5])
369            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
370                              [0, 1, 5])
371            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
372                              [0])
373            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
374                              [0, 4, 5])
375            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
376                              [0, 3, 4, 5])
377    
378            # Two columns as operands
379            self.assertEquals(table.SimpleQuery(table.Column(1),
380                                                "<=", table.Column(2)),
381                              [0, 1, 3, 5])
382    
383            # Test whether invalid operators raise a ValueError
384            self.assertRaises(ValueError,
385                              table.SimpleQuery,
386                              table.Column(1), "<<", table.Column(2))
387    
388        def test_transienttable_to_dbf(self):
389            memtable = MemoryTable([("type", FIELDTYPE_STRING),
390                                    ("value", FIELDTYPE_DOUBLE),
391                                    ("code", FIELDTYPE_INT)],
392                                   [("UNKNOWN", 0.0, 0),
393                                    ("Foo", 0.5, -1),
394                                    ("Foo", 1.0/256, 100),
395                                    ("bar", 1e10, 17)])
396            table = TransientTable(self.transientdb, memtable)
397            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398            table_to_dbf(table, filename)
399    
400            dbf = dbflib.DBFFile(filename)
401            self.assertEquals(dbf.read_record(2),
402                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403            self.assertEquals(dbf.field_count(), 3)
404            self.assertEquals(dbf.record_count(), 4)
405            self.assertEquals(dbf.field_info(0),
406                              (dbflib.FTString, "type", 7, 0))
407            self.assertEquals(dbf.field_info(1),
408                              (dbflib.FTDouble, "value", 24, 12))
409            self.assertEquals(dbf.field_info(2),
410                              (dbflib.FTInteger, "code", 3, 0))
411    
412    
413    
414  if __name__ == "__main__":  if __name__ == "__main__":

Legend:
Removed from v.765  
changed lines
  Added in v.2734

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26