/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 948 by jonathan, Tue May 20 15:27:31 2003 UTC revision 1381 by bh, Tue Jul 8 16:37:46 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
# Line 101  class TestTransientTable(unittest.TestCa Line 101  class TestTransientTable(unittest.TestCa
101          # The transient_table method should return the table itself          # The transient_table method should return the table itself
102          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
103    
104            # The title is simply copied over from the original table
105            self.assertEquals(table.Title(), orig_table.Title())
106    
107            # The TransientTable class itself doesn't implement the
108            # Dependencies method, so we don't test it.
109    
110    
111      def test_auto_transient_table(self):      def test_auto_transient_table(self):
112          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 135  class TestTransientTable(unittest.TestCa Line 141  class TestTransientTable(unittest.TestCa
141                            [144])                            [144])
142    
143          # test using a Column object as the right parameter          # test using a Column object as the right parameter
144          self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),          self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
145                                              "==",                                              "==",
146                                              table.Column("POPYREG")),                                              table.Column("POPYREG")),
147                            range(156))                            range(156))
148    
149        def test_auto_transient_table_dependencies(self):
150            """Test AutoTransientTable.Dependencies()"""
151            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
152                                               "political.dbf"))
153            table = AutoTransientTable(self.transientdb, orig_table)
154            self.assertEquals(table.Dependencies(), (orig_table,))
155    
156        def test_auto_transient_table_title(self):
157            """Test AutoTransientTable.Title()"""
158            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
159                                               "political.dbf"))
160            table = AutoTransientTable(self.transientdb, orig_table)
161            # The title is of course the same as that of the original table
162            self.assertEquals(table.Title(), orig_table.Title())
163    
164      def test_transient_joined_table(self):      def test_transient_joined_table(self):
165          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
166          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 185  class TestTransientTable(unittest.TestCa Line 206  class TestTransientTable(unittest.TestCa
206          # The transient_table method should return the table itself          # The transient_table method should return the table itself
207          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
208    
209            # The TransientJoinedTable depends on both input tables
210            self.assertEquals(table.Dependencies(), (landmarks, auto))
211    
212            # The title is constructed from the titles of the input tables.
213            self.assertEquals(table.Title(),
214                              "Join of %s and %s" % (landmarks.Title(),
215                                                     auto.Title()))
216    
217    
218        def test_transient_joined_table_same_column_name(self):
219            """Test TransientJoinedTable join on columns with same name
220    
221            The transient DB maps the column names used by the tables to
222            another set of names used only inside the SQLite database. There
223            was a bug in the way this mapping was used when joining on
224            fields with the same names in both tables so that the joined
225            table ended up joining on the same column in the same table.
226            """
227            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
228                                        [(i,) for i in range(4)])
229            stretches = AutoTransientTable(self.transientdb, mem_stretches)
230    
231            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
232                                          ("stretch_id", FIELDTYPE_INT)],
233                                         [(1, 0), (2, 3)])
234            discharges = AutoTransientTable(self.transientdb, mem_discharges)
235    
236            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
237                                         discharges, "stretch_id",
238                                         outer_join = True)
239    
240            self.assertEquals(table.NumRows(), 4)
241            self.assertEquals(table.NumColumns(), 3)
242    
243            # HasColumn
244            self.failUnless(table.HasColumn("stretch_id"))
245            self.failUnless(table.HasColumn("disch_id"))
246    
247    
248        def test_transient_joined_table_with_equal_column_names(self):
249            """Test TransientJoinedTable join on tables with equal column names
250    
251            If a name collision occurs for the field names, underscores are
252            appended as long as any collision is resolved.
253            """
254            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
255                                         ("name", FIELDTYPE_INT)],
256                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
257            stretches = AutoTransientTable(self.transientdb, mem_stretches)
258    
259            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
260                                          ("stretch_id", FIELDTYPE_INT),
261                                          ("name", FIELDTYPE_INT)],
262                                         [(1, 0, 1), (2, 3, 2)])
263            discharges = AutoTransientTable(self.transientdb, mem_discharges)
264    
265            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
266                                         discharges, "stretch_id",
267                                         outer_join = True)
268    
269            self.assertEquals(table.NumRows(), 4)
270            self.assertEquals(table.NumColumns(), 5)
271    
272            # HasColumn
273            self.assertEquals([c.name for c in table.Columns()],
274                              ["stretch_id", "name", "disch_id", "stretch_id_",
275                               "name_"])
276    
277        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
278            """Test TransientJoinedTable name-collisions do not modifying in place
279    
280            The name collision work-around by appending underscores
281            accidentally modified the column objects in place. We do two
282            joins therefore in reverse order to detect this: The first join
283            will lead to a modified name in the column object of the right
284            table which is then used as the left table in the second join so
285            the underscored name will appear before the non-underscored one
286            in the list of column names after the second join.
287            """
288            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
289                                ("name", FIELDTYPE_INT)],
290                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
291            table1 = AutoTransientTable(self.transientdb, mem1)
292    
293            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
294                                ("name", FIELDTYPE_INT)],
295                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
296            table2 = AutoTransientTable(self.transientdb, mem2)
297    
298            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
299                                         table2, "stretch_id", outer_join = True)
300            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
301                                         table1, "stretch_id", outer_join = True)
302    
303            self.assertEquals([c.name for c in table.Columns()],
304                              ["stretch_id", "name", "stretch_id_", "name_"])
305    
306      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
307          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
# Line 245  class TestTransientTable(unittest.TestCa Line 362  class TestTransientTable(unittest.TestCa
362                            table.SimpleQuery,                            table.SimpleQuery,
363                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
364    
365        def test_transienttable_to_dbf(self):
366            memtable = MemoryTable([("type", FIELDTYPE_STRING),
367                                    ("value", FIELDTYPE_DOUBLE),
368                                    ("code", FIELDTYPE_INT)],
369                                   [("UNKNOWN", 0.0, 0),
370                                    ("Foo", 0.5, -1),
371                                    ("Foo", 1.0/256, 100),
372                                    ("bar", 1e10, 17)])
373            table = TransientTable(self.transientdb, memtable)
374            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
375            table_to_dbf(table, filename)
376    
377            dbf = dbflib.DBFFile(filename)
378            self.assertEquals(dbf.read_record(2),
379                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
380            self.assertEquals(dbf.field_count(), 3)
381            self.assertEquals(dbf.record_count(), 4)
382            self.assertEquals(dbf.field_info(0),
383                              (dbflib.FTString, "type", 7, 0))
384            self.assertEquals(dbf.field_info(1),
385                              (dbflib.FTDouble, "value", 24, 12))
386            self.assertEquals(dbf.field_info(2),
387                              (dbflib.FTInteger, "code", 3, 0))
388    
389    
390    
391  if __name__ == "__main__":  if __name__ == "__main__":
392      support.run_tests()      support.run_tests()

Legend:
Removed from v.948  
changed lines
  Added in v.1381

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26