/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1332 by frank, Tue Jul 1 15:40:52 2003 UTC revision 1662 by bh, Wed Aug 27 13:51:01 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 73  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 201  class TestTransientTable(unittest.TestCa Line 214  class TestTransientTable(unittest.TestCa
214                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
215          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
217            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218                              "RUINS")
219            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220            self.assertEquals(table.RowIdToOrdinal(23), 23)
221            self.assertEquals(table.RowOrdinalToId(23), 23)
222    
223          # The transient_table method should return the table itself          # The transient_table method should return the table itself
224          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
# Line 237  class TestTransientTable(unittest.TestCa Line 255  class TestTransientTable(unittest.TestCa
255                                       outer_join = True)                                       outer_join = True)
256    
257          self.assertEquals(table.NumRows(), 4)          self.assertEquals(table.NumRows(), 4)
258          self.assertEquals(table.NumColumns(), 2)          self.assertEquals(table.NumColumns(), 3)
259    
260          # HasColumn          # HasColumn
261          self.failUnless(table.HasColumn("stretch_id"))          self.failUnless(table.HasColumn("stretch_id"))
# Line 247  class TestTransientTable(unittest.TestCa Line 265  class TestTransientTable(unittest.TestCa
265      def test_transient_joined_table_with_equal_column_names(self):      def test_transient_joined_table_with_equal_column_names(self):
266          """Test TransientJoinedTable join on tables with equal column names          """Test TransientJoinedTable join on tables with equal column names
267    
268          The join of two tables contains all fields from both tables instead          If a name collision occurs for the field names, underscores are
269          th field the join was performed on. This special field is included          appended as long as any collision is resolved.
         once. If a name collision occurs for the field names, underscores are  
         appended as long as any collision is resolved.        
270          """          """
271          mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),          mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272                                       ("name", FIELDTYPE_INT)],                                       ("name", FIELDTYPE_INT)],
# Line 268  class TestTransientTable(unittest.TestCa Line 284  class TestTransientTable(unittest.TestCa
284                                       outer_join = True)                                       outer_join = True)
285    
286          self.assertEquals(table.NumRows(), 4)          self.assertEquals(table.NumRows(), 4)
287          self.assertEquals(table.NumColumns(), 4)          self.assertEquals(table.NumColumns(), 5)
288    
289          # HasColumn          # HasColumn
290          self.failUnless(table.HasColumn("stretch_id"))          self.assertEquals([c.name for c in table.Columns()],
291          self.failUnless(table.HasColumn("disch_id"))                            ["stretch_id", "name", "disch_id", "stretch_id_",
292          self.failUnless(table.HasColumn("name"))                             "name_"])
293          self.failUnless(table.HasColumn("name_"))  
294        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295            """Test TransientJoinedTable name-collisions do not modifying in place
296    
297            The name collision work-around by appending underscores
298            accidentally modified the column objects in place. We do two
299            joins therefore in reverse order to detect this: The first join
300            will lead to a modified name in the column object of the right
301            table which is then used as the left table in the second join so
302            the underscored name will appear before the non-underscored one
303            in the list of column names after the second join.
304            """
305            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306                                ("name", FIELDTYPE_INT)],
307                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
308            table1 = AutoTransientTable(self.transientdb, mem1)
309    
310            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311                                ("name", FIELDTYPE_INT)],
312                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
313            table2 = AutoTransientTable(self.transientdb, mem2)
314    
315            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316                                         table2, "stretch_id", outer_join = True)
317            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318                                         table1, "stretch_id", outer_join = True)
319    
320            self.assertEquals([c.name for c in table.Columns()],
321                              ["stretch_id", "name", "stretch_id_", "name_"])
322    
323      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
324          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
# Line 336  class TestTransientTable(unittest.TestCa Line 379  class TestTransientTable(unittest.TestCa
379                            table.SimpleQuery,                            table.SimpleQuery,
380                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
381    
382        def test_transienttable_to_dbf(self):
383            memtable = MemoryTable([("type", FIELDTYPE_STRING),
384                                    ("value", FIELDTYPE_DOUBLE),
385                                    ("code", FIELDTYPE_INT)],
386                                   [("UNKNOWN", 0.0, 0),
387                                    ("Foo", 0.5, -1),
388                                    ("Foo", 1.0/256, 100),
389                                    ("bar", 1e10, 17)])
390            table = TransientTable(self.transientdb, memtable)
391            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392            table_to_dbf(table, filename)
393    
394            dbf = dbflib.DBFFile(filename)
395            self.assertEquals(dbf.read_record(2),
396                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397            self.assertEquals(dbf.field_count(), 3)
398            self.assertEquals(dbf.record_count(), 4)
399            self.assertEquals(dbf.field_info(0),
400                              (dbflib.FTString, "type", 7, 0))
401            self.assertEquals(dbf.field_info(1),
402                              (dbflib.FTDouble, "value", 24, 12))
403            self.assertEquals(dbf.field_info(2),
404                              (dbflib.FTInteger, "code", 3, 0))
405    
406    
407    
408  if __name__ == "__main__":  if __name__ == "__main__":
409      support.run_tests()      support.run_tests()

Legend:
Removed from v.1332  
changed lines
  Added in v.1662

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26