/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1328 by bh, Tue Jul 1 12:01:58 2003 UTC revision 1924 by bh, Fri Nov 7 12:07:11 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 73  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 123  class TestTransientTable(unittest.TestCa Line 136  class TestTransientTable(unittest.TestCa
136          # least one call to a method that copies to the transient db at          # least one call to a method that copies to the transient db at
137          # its end.          # its end.
138          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
139            # At this point the data has probably not been copied to the
140            # transient DB yet, so we force it by calling the
141            # transient_table method.
142            table.transient_table()
143    
144            # Run the tests again.
145          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
146    
147      def test_auto_transient_table_query(self):      def test_auto_transient_table_query(self):
# Line 201  class TestTransientTable(unittest.TestCa Line 220  class TestTransientTable(unittest.TestCa
220                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
221          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
223            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224                              "RUINS")
225            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226            self.assertEquals(table.RowIdToOrdinal(23), 23)
227            self.assertEquals(table.RowOrdinalToId(23), 23)
228    
229          # The transient_table method should return the table itself          # The transient_table method should return the table itself
230          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
# Line 237  class TestTransientTable(unittest.TestCa Line 261  class TestTransientTable(unittest.TestCa
261                                       outer_join = True)                                       outer_join = True)
262    
263          self.assertEquals(table.NumRows(), 4)          self.assertEquals(table.NumRows(), 4)
264          self.assertEquals(table.NumColumns(), 2)          self.assertEquals(table.NumColumns(), 3)
265    
266          # HasColumn          # HasColumn
267          self.failUnless(table.HasColumn("stretch_id"))          self.failUnless(table.HasColumn("stretch_id"))
268          self.failUnless(table.HasColumn("disch_id"))          self.failUnless(table.HasColumn("disch_id"))
269    
270    
271        def test_transient_joined_table_with_equal_column_names(self):
272            """Test TransientJoinedTable join on tables with equal column names
273    
274            If a name collision occurs for the field names, underscores are
275            appended as long as any collision is resolved.
276            """
277            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278                                         ("name", FIELDTYPE_INT)],
279                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
280            stretches = AutoTransientTable(self.transientdb, mem_stretches)
281    
282            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283                                          ("stretch_id", FIELDTYPE_INT),
284                                          ("name", FIELDTYPE_INT)],
285                                         [(1, 0, 1), (2, 3, 2)])
286            discharges = AutoTransientTable(self.transientdb, mem_discharges)
287    
288            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289                                         discharges, "stretch_id",
290                                         outer_join = True)
291    
292            self.assertEquals(table.NumRows(), 4)
293            self.assertEquals(table.NumColumns(), 5)
294    
295            # HasColumn
296            self.assertEquals([c.name for c in table.Columns()],
297                              ["stretch_id", "name", "disch_id", "stretch_id_",
298                               "name_"])
299    
300        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301            """Test TransientJoinedTable name-collisions do not modifying in place
302    
303            The name collision work-around by appending underscores
304            accidentally modified the column objects in place. We do two
305            joins therefore in reverse order to detect this: The first join
306            will lead to a modified name in the column object of the right
307            table which is then used as the left table in the second join so
308            the underscored name will appear before the non-underscored one
309            in the list of column names after the second join.
310            """
311            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312                                ("name", FIELDTYPE_INT)],
313                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
314            table1 = AutoTransientTable(self.transientdb, mem1)
315    
316            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317                                ("name", FIELDTYPE_INT)],
318                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
319            table2 = AutoTransientTable(self.transientdb, mem2)
320    
321            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322                                         table2, "stretch_id", outer_join = True)
323            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324                                         table1, "stretch_id", outer_join = True)
325    
326            self.assertEquals([c.name for c in table.Columns()],
327                              ["stretch_id", "name", "stretch_id_", "name_"])
328    
329      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
330          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
331          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 303  class TestTransientTable(unittest.TestCa Line 385  class TestTransientTable(unittest.TestCa
385                            table.SimpleQuery,                            table.SimpleQuery,
386                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
387    
388        def test_transienttable_to_dbf(self):
389            memtable = MemoryTable([("type", FIELDTYPE_STRING),
390                                    ("value", FIELDTYPE_DOUBLE),
391                                    ("code", FIELDTYPE_INT)],
392                                   [("UNKNOWN", 0.0, 0),
393                                    ("Foo", 0.5, -1),
394                                    ("Foo", 1.0/256, 100),
395                                    ("bar", 1e10, 17)])
396            table = TransientTable(self.transientdb, memtable)
397            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398            table_to_dbf(table, filename)
399    
400            dbf = dbflib.DBFFile(filename)
401            self.assertEquals(dbf.read_record(2),
402                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403            self.assertEquals(dbf.field_count(), 3)
404            self.assertEquals(dbf.record_count(), 4)
405            self.assertEquals(dbf.field_info(0),
406                              (dbflib.FTString, "type", 7, 0))
407            self.assertEquals(dbf.field_info(1),
408                              (dbflib.FTDouble, "value", 24, 12))
409            self.assertEquals(dbf.field_info(2),
410                              (dbflib.FTInteger, "code", 3, 0))
411    
412    
413    
414  if __name__ == "__main__":  if __name__ == "__main__":
415      support.run_tests()      support.run_tests()

Legend:
Removed from v.1328  
changed lines
  Added in v.1924

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26