/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1332 by frank, Tue Jul 1 15:40:52 2003 UTC revision 2705 by bernhard, Sun Sep 24 18:55:30 2006 UTC
# Line 1  Line 1 
1  # Copyright (c) 2002, 2003 by Intevation GmbH  # Copyright (c) 2002, 2003, 2006 by Intevation GmbH
2  # Authors:  # Authors:
3  # Bernhard Herzog <[email protected]>  # Bernhard Herzog <[email protected]>
4  #  #
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 42  class TestTransientTable(unittest.TestCa Line 43  class TestTransientTable(unittest.TestCa
43          self.transientdb.close()          self.transientdb.close()
44    
45      def run_iceland_political_tests(self, table):      def run_iceland_political_tests(self, table):
46          """Run some tests on tablte          """Run some tests on table
47    
48          Assume that table holds the data of the file          Assume that table holds the data of the file
49          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
# Line 73  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 123  class TestTransientTable(unittest.TestCa Line 136  class TestTransientTable(unittest.TestCa
136          # least one call to a method that copies to the transient db at          # least one call to a method that copies to the transient db at
137          # its end.          # its end.
138          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
139            # At this point the data has probably not been copied to the
140            # transient DB yet, so we force it by calling the
141            # transient_table method.
142            table.transient_table()
143    
144            # Run the tests again.
145          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
146    
147      def test_auto_transient_table_query(self):      def test_auto_transient_table_query(self):
# Line 201  class TestTransientTable(unittest.TestCa Line 220  class TestTransientTable(unittest.TestCa
220                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
221          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
223            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224                              "RUINS")
225            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226            self.assertEquals(table.RowIdToOrdinal(23), 23)
227            self.assertEquals(table.RowOrdinalToId(23), 23)
228    
229          # The transient_table method should return the table itself          # The transient_table method should return the table itself
230          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
# Line 237  class TestTransientTable(unittest.TestCa Line 261  class TestTransientTable(unittest.TestCa
261                                       outer_join = True)                                       outer_join = True)
262    
263          self.assertEquals(table.NumRows(), 4)          self.assertEquals(table.NumRows(), 4)
264          self.assertEquals(table.NumColumns(), 2)          self.assertEquals(table.NumColumns(), 3)
265    
266          # HasColumn          # HasColumn
267          self.failUnless(table.HasColumn("stretch_id"))          self.failUnless(table.HasColumn("stretch_id"))
# Line 247  class TestTransientTable(unittest.TestCa Line 271  class TestTransientTable(unittest.TestCa
271      def test_transient_joined_table_with_equal_column_names(self):      def test_transient_joined_table_with_equal_column_names(self):
272          """Test TransientJoinedTable join on tables with equal column names          """Test TransientJoinedTable join on tables with equal column names
273    
274          The join of two tables contains all fields from both tables instead          If a name collision occurs for the field names, underscores are
275          th field the join was performed on. This special field is included          appended as long as any collision is resolved.
         once. If a name collision occurs for the field names, underscores are  
         appended as long as any collision is resolved.        
276          """          """
277          mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),          mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278                                       ("name", FIELDTYPE_INT)],                                       ("name", FIELDTYPE_INT)],
# Line 268  class TestTransientTable(unittest.TestCa Line 290  class TestTransientTable(unittest.TestCa
290                                       outer_join = True)                                       outer_join = True)
291    
292          self.assertEquals(table.NumRows(), 4)          self.assertEquals(table.NumRows(), 4)
293          self.assertEquals(table.NumColumns(), 4)          self.assertEquals(table.NumColumns(), 5)
294    
295          # HasColumn          # HasColumn
296          self.failUnless(table.HasColumn("stretch_id"))          self.assertEquals([c.name for c in table.Columns()],
297          self.failUnless(table.HasColumn("disch_id"))                            ["stretch_id", "name", "disch_id", "stretch_id_",
298          self.failUnless(table.HasColumn("name"))                             "name_"])
299          self.failUnless(table.HasColumn("name_"))  
300        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301            """Test TransientJoinedTable name-collisions do not modifying in place
302    
303            The name collision work-around by appending underscores
304            accidentally modified the column objects in place. We do two
305            joins therefore in reverse order to detect this: The first join
306            will lead to a modified name in the column object of the right
307            table which is then used as the left table in the second join so
308            the underscored name will appear before the non-underscored one
309            in the list of column names after the second join.
310            """
311            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312                                ("name", FIELDTYPE_INT)],
313                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
314            table1 = AutoTransientTable(self.transientdb, mem1)
315    
316            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317                                ("name", FIELDTYPE_INT)],
318                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
319            table2 = AutoTransientTable(self.transientdb, mem2)
320    
321            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322                                         table2, "stretch_id", outer_join = True)
323            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324                                         table1, "stretch_id", outer_join = True)
325    
326            self.assertEquals([c.name for c in table.Columns()],
327                              ["stretch_id", "name", "stretch_id_", "name_"])
328    
329      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
330          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
# Line 336  class TestTransientTable(unittest.TestCa Line 385  class TestTransientTable(unittest.TestCa
385                            table.SimpleQuery,                            table.SimpleQuery,
386                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
387    
388        def test_transienttable_to_dbf(self):
389            memtable = MemoryTable([("type", FIELDTYPE_STRING),
390                                    ("value", FIELDTYPE_DOUBLE),
391                                    ("code", FIELDTYPE_INT)],
392                                   [("UNKNOWN", 0.0, 0),
393                                    ("Foo", 0.5, -1),
394                                    ("Foo", 1.0/256, 100),
395                                    ("bar", 1e10, 17)])
396            table = TransientTable(self.transientdb, memtable)
397            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398            table_to_dbf(table, filename)
399    
400            dbf = dbflib.DBFFile(filename)
401            self.assertEquals(dbf.read_record(2),
402                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403            self.assertEquals(dbf.field_count(), 3)
404            self.assertEquals(dbf.record_count(), 4)
405            self.assertEquals(dbf.field_info(0),
406                              (dbflib.FTString, "type", 7, 0))
407            self.assertEquals(dbf.field_info(1),
408                              (dbflib.FTDouble, "value", 24, 12))
409            self.assertEquals(dbf.field_info(2),
410                              (dbflib.FTInteger, "code", 3, 0))
411    
412    
413    
414  if __name__ == "__main__":  if __name__ == "__main__":
415      support.run_tests()      support.run_tests()

Legend:
Removed from v.1332  
changed lines
  Added in v.2705

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26