/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 948 by jonathan, Tue May 20 15:27:31 2003 UTC revision 1924 by bh, Fri Nov 7 12:07:11 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
# Line 74  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 101  class TestTransientTable(unittest.TestCa Line 113  class TestTransientTable(unittest.TestCa
113          # The transient_table method should return the table itself          # The transient_table method should return the table itself
114          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
115    
116            # The title is simply copied over from the original table
117            self.assertEquals(table.Title(), orig_table.Title())
118    
119            # The TransientTable class itself doesn't implement the
120            # Dependencies method, so we don't test it.
121    
122    
123      def test_auto_transient_table(self):      def test_auto_transient_table(self):
124          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 118  class TestTransientTable(unittest.TestCa Line 136  class TestTransientTable(unittest.TestCa
136          # least one call to a method that copies to the transient db at          # least one call to a method that copies to the transient db at
137          # its end.          # its end.
138          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
139            # At this point the data has probably not been copied to the
140            # transient DB yet, so we force it by calling the
141            # transient_table method.
142            table.transient_table()
143    
144            # Run the tests again.
145          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
146    
147      def test_auto_transient_table_query(self):      def test_auto_transient_table_query(self):
# Line 135  class TestTransientTable(unittest.TestCa Line 159  class TestTransientTable(unittest.TestCa
159                            [144])                            [144])
160    
161          # test using a Column object as the right parameter          # test using a Column object as the right parameter
162          self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),          self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
163                                              "==",                                              "==",
164                                              table.Column("POPYREG")),                                              table.Column("POPYREG")),
165                            range(156))                            range(156))
166    
167        def test_auto_transient_table_dependencies(self):
168            """Test AutoTransientTable.Dependencies()"""
169            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
170                                               "political.dbf"))
171            table = AutoTransientTable(self.transientdb, orig_table)
172            self.assertEquals(table.Dependencies(), (orig_table,))
173    
174        def test_auto_transient_table_title(self):
175            """Test AutoTransientTable.Title()"""
176            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
177                                               "political.dbf"))
178            table = AutoTransientTable(self.transientdb, orig_table)
179            # The title is of course the same as that of the original table
180            self.assertEquals(table.Title(), orig_table.Title())
181    
182      def test_transient_joined_table(self):      def test_transient_joined_table(self):
183          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
184          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 181  class TestTransientTable(unittest.TestCa Line 220  class TestTransientTable(unittest.TestCa
220                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
221          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
222          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
223            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
224                              "RUINS")
225            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
226            self.assertEquals(table.RowIdToOrdinal(23), 23)
227            self.assertEquals(table.RowOrdinalToId(23), 23)
228    
229          # The transient_table method should return the table itself          # The transient_table method should return the table itself
230          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
231    
232            # The TransientJoinedTable depends on both input tables
233            self.assertEquals(table.Dependencies(), (landmarks, auto))
234    
235            # The title is constructed from the titles of the input tables.
236            self.assertEquals(table.Title(),
237                              "Join of %s and %s" % (landmarks.Title(),
238                                                     auto.Title()))
239    
240    
241        def test_transient_joined_table_same_column_name(self):
242            """Test TransientJoinedTable join on columns with same name
243    
244            The transient DB maps the column names used by the tables to
245            another set of names used only inside the SQLite database. There
246            was a bug in the way this mapping was used when joining on
247            fields with the same names in both tables so that the joined
248            table ended up joining on the same column in the same table.
249            """
250            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
251                                        [(i,) for i in range(4)])
252            stretches = AutoTransientTable(self.transientdb, mem_stretches)
253    
254            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
255                                          ("stretch_id", FIELDTYPE_INT)],
256                                         [(1, 0), (2, 3)])
257            discharges = AutoTransientTable(self.transientdb, mem_discharges)
258    
259            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
260                                         discharges, "stretch_id",
261                                         outer_join = True)
262    
263            self.assertEquals(table.NumRows(), 4)
264            self.assertEquals(table.NumColumns(), 3)
265    
266            # HasColumn
267            self.failUnless(table.HasColumn("stretch_id"))
268            self.failUnless(table.HasColumn("disch_id"))
269    
270    
271        def test_transient_joined_table_with_equal_column_names(self):
272            """Test TransientJoinedTable join on tables with equal column names
273    
274            If a name collision occurs for the field names, underscores are
275            appended as long as any collision is resolved.
276            """
277            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
278                                         ("name", FIELDTYPE_INT)],
279                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
280            stretches = AutoTransientTable(self.transientdb, mem_stretches)
281    
282            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
283                                          ("stretch_id", FIELDTYPE_INT),
284                                          ("name", FIELDTYPE_INT)],
285                                         [(1, 0, 1), (2, 3, 2)])
286            discharges = AutoTransientTable(self.transientdb, mem_discharges)
287    
288            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
289                                         discharges, "stretch_id",
290                                         outer_join = True)
291    
292            self.assertEquals(table.NumRows(), 4)
293            self.assertEquals(table.NumColumns(), 5)
294    
295            # HasColumn
296            self.assertEquals([c.name for c in table.Columns()],
297                              ["stretch_id", "name", "disch_id", "stretch_id_",
298                               "name_"])
299    
300        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
301            """Test TransientJoinedTable name-collisions do not modifying in place
302    
303            The name collision work-around by appending underscores
304            accidentally modified the column objects in place. We do two
305            joins therefore in reverse order to detect this: The first join
306            will lead to a modified name in the column object of the right
307            table which is then used as the left table in the second join so
308            the underscored name will appear before the non-underscored one
309            in the list of column names after the second join.
310            """
311            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
312                                ("name", FIELDTYPE_INT)],
313                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
314            table1 = AutoTransientTable(self.transientdb, mem1)
315    
316            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
317                                ("name", FIELDTYPE_INT)],
318                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
319            table2 = AutoTransientTable(self.transientdb, mem2)
320    
321            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
322                                         table2, "stretch_id", outer_join = True)
323            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
324                                         table1, "stretch_id", outer_join = True)
325    
326            self.assertEquals([c.name for c in table.Columns()],
327                              ["stretch_id", "name", "stretch_id_", "name_"])
328    
329      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
330          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
# Line 245  class TestTransientTable(unittest.TestCa Line 385  class TestTransientTable(unittest.TestCa
385                            table.SimpleQuery,                            table.SimpleQuery,
386                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
387    
388        def test_transienttable_to_dbf(self):
389            memtable = MemoryTable([("type", FIELDTYPE_STRING),
390                                    ("value", FIELDTYPE_DOUBLE),
391                                    ("code", FIELDTYPE_INT)],
392                                   [("UNKNOWN", 0.0, 0),
393                                    ("Foo", 0.5, -1),
394                                    ("Foo", 1.0/256, 100),
395                                    ("bar", 1e10, 17)])
396            table = TransientTable(self.transientdb, memtable)
397            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
398            table_to_dbf(table, filename)
399    
400            dbf = dbflib.DBFFile(filename)
401            self.assertEquals(dbf.read_record(2),
402                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
403            self.assertEquals(dbf.field_count(), 3)
404            self.assertEquals(dbf.record_count(), 4)
405            self.assertEquals(dbf.field_info(0),
406                              (dbflib.FTString, "type", 7, 0))
407            self.assertEquals(dbf.field_info(1),
408                              (dbflib.FTDouble, "value", 24, 12))
409            self.assertEquals(dbf.field_info(2),
410                              (dbflib.FTInteger, "code", 3, 0))
411    
412    
413    
414  if __name__ == "__main__":  if __name__ == "__main__":
415      support.run_tests()      support.run_tests()

Legend:
Removed from v.948  
changed lines
  Added in v.1924

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26