/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 998 by bh, Thu May 22 19:29:39 2003 UTC revision 1662 by bh, Wed Aug 27 13:51:01 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
# Line 74  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 202  class TestTransientTable(unittest.TestCa Line 214  class TestTransientTable(unittest.TestCa
214                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
215          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
217            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218                              "RUINS")
219            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220            self.assertEquals(table.RowIdToOrdinal(23), 23)
221            self.assertEquals(table.RowOrdinalToId(23), 23)
222    
223          # The transient_table method should return the table itself          # The transient_table method should return the table itself
224          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
# Line 215  class TestTransientTable(unittest.TestCa Line 232  class TestTransientTable(unittest.TestCa
232                                                   auto.Title()))                                                   auto.Title()))
233    
234    
235        def test_transient_joined_table_same_column_name(self):
236            """Test TransientJoinedTable join on columns with same name
237    
238            The transient DB maps the column names used by the tables to
239            another set of names used only inside the SQLite database. There
240            was a bug in the way this mapping was used when joining on
241            fields with the same names in both tables so that the joined
242            table ended up joining on the same column in the same table.
243            """
244            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
245                                        [(i,) for i in range(4)])
246            stretches = AutoTransientTable(self.transientdb, mem_stretches)
247    
248            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
249                                          ("stretch_id", FIELDTYPE_INT)],
250                                         [(1, 0), (2, 3)])
251            discharges = AutoTransientTable(self.transientdb, mem_discharges)
252    
253            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
254                                         discharges, "stretch_id",
255                                         outer_join = True)
256    
257            self.assertEquals(table.NumRows(), 4)
258            self.assertEquals(table.NumColumns(), 3)
259    
260            # HasColumn
261            self.failUnless(table.HasColumn("stretch_id"))
262            self.failUnless(table.HasColumn("disch_id"))
263    
264    
265        def test_transient_joined_table_with_equal_column_names(self):
266            """Test TransientJoinedTable join on tables with equal column names
267    
268            If a name collision occurs for the field names, underscores are
269            appended as long as any collision is resolved.
270            """
271            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272                                         ("name", FIELDTYPE_INT)],
273                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
274            stretches = AutoTransientTable(self.transientdb, mem_stretches)
275    
276            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
277                                          ("stretch_id", FIELDTYPE_INT),
278                                          ("name", FIELDTYPE_INT)],
279                                         [(1, 0, 1), (2, 3, 2)])
280            discharges = AutoTransientTable(self.transientdb, mem_discharges)
281    
282            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
283                                         discharges, "stretch_id",
284                                         outer_join = True)
285    
286            self.assertEquals(table.NumRows(), 4)
287            self.assertEquals(table.NumColumns(), 5)
288    
289            # HasColumn
290            self.assertEquals([c.name for c in table.Columns()],
291                              ["stretch_id", "name", "disch_id", "stretch_id_",
292                               "name_"])
293    
294        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295            """Test TransientJoinedTable name-collisions do not modifying in place
296    
297            The name collision work-around by appending underscores
298            accidentally modified the column objects in place. We do two
299            joins therefore in reverse order to detect this: The first join
300            will lead to a modified name in the column object of the right
301            table which is then used as the left table in the second join so
302            the underscored name will appear before the non-underscored one
303            in the list of column names after the second join.
304            """
305            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306                                ("name", FIELDTYPE_INT)],
307                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
308            table1 = AutoTransientTable(self.transientdb, mem1)
309    
310            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311                                ("name", FIELDTYPE_INT)],
312                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
313            table2 = AutoTransientTable(self.transientdb, mem2)
314    
315            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316                                         table2, "stretch_id", outer_join = True)
317            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318                                         table1, "stretch_id", outer_join = True)
319    
320            self.assertEquals([c.name for c in table.Columns()],
321                              ["stretch_id", "name", "stretch_id_", "name_"])
322    
323      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
324          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
325          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 274  class TestTransientTable(unittest.TestCa Line 379  class TestTransientTable(unittest.TestCa
379                            table.SimpleQuery,                            table.SimpleQuery,
380                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
381    
382        def test_transienttable_to_dbf(self):
383            memtable = MemoryTable([("type", FIELDTYPE_STRING),
384                                    ("value", FIELDTYPE_DOUBLE),
385                                    ("code", FIELDTYPE_INT)],
386                                   [("UNKNOWN", 0.0, 0),
387                                    ("Foo", 0.5, -1),
388                                    ("Foo", 1.0/256, 100),
389                                    ("bar", 1e10, 17)])
390            table = TransientTable(self.transientdb, memtable)
391            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392            table_to_dbf(table, filename)
393    
394            dbf = dbflib.DBFFile(filename)
395            self.assertEquals(dbf.read_record(2),
396                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397            self.assertEquals(dbf.field_count(), 3)
398            self.assertEquals(dbf.record_count(), 4)
399            self.assertEquals(dbf.field_info(0),
400                              (dbflib.FTString, "type", 7, 0))
401            self.assertEquals(dbf.field_info(1),
402                              (dbflib.FTDouble, "value", 24, 12))
403            self.assertEquals(dbf.field_info(2),
404                              (dbflib.FTInteger, "code", 3, 0))
405    
406    
407    
408  if __name__ == "__main__":  if __name__ == "__main__":
409      support.run_tests()      support.run_tests()

Legend:
Removed from v.998  
changed lines
  Added in v.1662

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26