/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 998 by bh, Thu May 22 19:29:39 2003 UTC revision 1381 by bh, Tue Jul 8 16:37:46 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
# Line 215  class TestTransientTable(unittest.TestCa Line 215  class TestTransientTable(unittest.TestCa
215                                                   auto.Title()))                                                   auto.Title()))
216    
217    
218        def test_transient_joined_table_same_column_name(self):
219            """Test TransientJoinedTable join on columns with same name
220    
221            The transient DB maps the column names used by the tables to
222            another set of names used only inside the SQLite database. There
223            was a bug in the way this mapping was used when joining on
224            fields with the same names in both tables so that the joined
225            table ended up joining on the same column in the same table.
226            """
227            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
228                                        [(i,) for i in range(4)])
229            stretches = AutoTransientTable(self.transientdb, mem_stretches)
230    
231            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
232                                          ("stretch_id", FIELDTYPE_INT)],
233                                         [(1, 0), (2, 3)])
234            discharges = AutoTransientTable(self.transientdb, mem_discharges)
235    
236            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
237                                         discharges, "stretch_id",
238                                         outer_join = True)
239    
240            self.assertEquals(table.NumRows(), 4)
241            self.assertEquals(table.NumColumns(), 3)
242    
243            # HasColumn
244            self.failUnless(table.HasColumn("stretch_id"))
245            self.failUnless(table.HasColumn("disch_id"))
246    
247    
248        def test_transient_joined_table_with_equal_column_names(self):
249            """Test TransientJoinedTable join on tables with equal column names
250    
251            If a name collision occurs for the field names, underscores are
252            appended as long as any collision is resolved.
253            """
254            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
255                                         ("name", FIELDTYPE_INT)],
256                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
257            stretches = AutoTransientTable(self.transientdb, mem_stretches)
258    
259            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
260                                          ("stretch_id", FIELDTYPE_INT),
261                                          ("name", FIELDTYPE_INT)],
262                                         [(1, 0, 1), (2, 3, 2)])
263            discharges = AutoTransientTable(self.transientdb, mem_discharges)
264    
265            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
266                                         discharges, "stretch_id",
267                                         outer_join = True)
268    
269            self.assertEquals(table.NumRows(), 4)
270            self.assertEquals(table.NumColumns(), 5)
271    
272            # HasColumn
273            self.assertEquals([c.name for c in table.Columns()],
274                              ["stretch_id", "name", "disch_id", "stretch_id_",
275                               "name_"])
276    
277        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
278            """Test TransientJoinedTable name-collisions do not modifying in place
279    
280            The name collision work-around by appending underscores
281            accidentally modified the column objects in place. We do two
282            joins therefore in reverse order to detect this: The first join
283            will lead to a modified name in the column object of the right
284            table which is then used as the left table in the second join so
285            the underscored name will appear before the non-underscored one
286            in the list of column names after the second join.
287            """
288            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
289                                ("name", FIELDTYPE_INT)],
290                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
291            table1 = AutoTransientTable(self.transientdb, mem1)
292    
293            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
294                                ("name", FIELDTYPE_INT)],
295                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
296            table2 = AutoTransientTable(self.transientdb, mem2)
297    
298            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
299                                         table2, "stretch_id", outer_join = True)
300            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
301                                         table1, "stretch_id", outer_join = True)
302    
303            self.assertEquals([c.name for c in table.Columns()],
304                              ["stretch_id", "name", "stretch_id_", "name_"])
305    
306      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
307          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
308          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 274  class TestTransientTable(unittest.TestCa Line 362  class TestTransientTable(unittest.TestCa
362                            table.SimpleQuery,                            table.SimpleQuery,
363                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
364    
365        def test_transienttable_to_dbf(self):
366            memtable = MemoryTable([("type", FIELDTYPE_STRING),
367                                    ("value", FIELDTYPE_DOUBLE),
368                                    ("code", FIELDTYPE_INT)],
369                                   [("UNKNOWN", 0.0, 0),
370                                    ("Foo", 0.5, -1),
371                                    ("Foo", 1.0/256, 100),
372                                    ("bar", 1e10, 17)])
373            table = TransientTable(self.transientdb, memtable)
374            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
375            table_to_dbf(table, filename)
376    
377            dbf = dbflib.DBFFile(filename)
378            self.assertEquals(dbf.read_record(2),
379                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
380            self.assertEquals(dbf.field_count(), 3)
381            self.assertEquals(dbf.record_count(), 4)
382            self.assertEquals(dbf.field_info(0),
383                              (dbflib.FTString, "type", 7, 0))
384            self.assertEquals(dbf.field_info(1),
385                              (dbflib.FTDouble, "value", 24, 12))
386            self.assertEquals(dbf.field_info(2),
387                              (dbflib.FTInteger, "code", 3, 0))
388    
389    
390    
391  if __name__ == "__main__":  if __name__ == "__main__":
392      support.run_tests()      support.run_tests()

Legend:
Removed from v.998  
changed lines
  Added in v.1381

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26