/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 849 by bh, Wed May 7 11:55:31 2003 UTC revision 1662 by bh, Wed Aug 27 13:51:01 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
# Line 74  class TestTransientTable(unittest.TestCa Line 74  class TestTransientTable(unittest.TestCa
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)          self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84          self.assertEquals(table.ReadValue(144, 3), 145)          self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # ValueRange may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
# Line 101  class TestTransientTable(unittest.TestCa Line 113  class TestTransientTable(unittest.TestCa
113          # The transient_table method should return the table itself          # The transient_table method should return the table itself
114          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
115    
116            # The title is simply copied over from the original table
117            self.assertEquals(table.Title(), orig_table.Title())
118    
119            # The TransientTable class itself doesn't implement the
120            # Dependencies method, so we don't test it.
121    
122    
123      def test_auto_transient_table(self):      def test_auto_transient_table(self):
124          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 125  class TestTransientTable(unittest.TestCa Line 143  class TestTransientTable(unittest.TestCa
143          orig_table = DBFTable(os.path.join("..", "Data", "iceland",          orig_table = DBFTable(os.path.join("..", "Data", "iceland",
144                                             "political.dbf"))                                             "political.dbf"))
145          table = AutoTransientTable(self.transientdb, orig_table)          table = AutoTransientTable(self.transientdb, orig_table)
146          # Only a simple test here. The AutoTransientTable siply          # Only a simple test here. The AutoTransientTable simply
147          # delegates to its transient table so it should be OK that the          # delegates to its transient table so it should be OK that the
148          # real test for it is in test_transient_table_query. However,          # real test for it is in test_transient_table_query. However,
149          # it's important to check that the column handling works          # it's important to check that the column handling works
# Line 134  class TestTransientTable(unittest.TestCa Line 152  class TestTransientTable(unittest.TestCa
152          self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),          self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
153                            [144])                            [144])
154    
155            # test using a Column object as the right parameter
156            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
157                                                "==",
158                                                table.Column("POPYREG")),
159                              range(156))
160    
161        def test_auto_transient_table_dependencies(self):
162            """Test AutoTransientTable.Dependencies()"""
163            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
164                                               "political.dbf"))
165            table = AutoTransientTable(self.transientdb, orig_table)
166            self.assertEquals(table.Dependencies(), (orig_table,))
167    
168        def test_auto_transient_table_title(self):
169            """Test AutoTransientTable.Title()"""
170            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
171                                               "political.dbf"))
172            table = AutoTransientTable(self.transientdb, orig_table)
173            # The title is of course the same as that of the original table
174            self.assertEquals(table.Title(), orig_table.Title())
175    
176      def test_transient_joined_table(self):      def test_transient_joined_table(self):
177          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
178          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
# Line 175  class TestTransientTable(unittest.TestCa Line 214  class TestTransientTable(unittest.TestCa
214                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
215          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')          self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216          self.assertEquals(table.ReadValue(22, 7), 1)          self.assertEquals(table.ReadValue(22, 7), 1)
217            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218                              "RUINS")
219            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220            self.assertEquals(table.RowIdToOrdinal(23), 23)
221            self.assertEquals(table.RowOrdinalToId(23), 23)
222    
223          # The transient_table method should return the table itself          # The transient_table method should return the table itself
224          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
225    
226            # The TransientJoinedTable depends on both input tables
227            self.assertEquals(table.Dependencies(), (landmarks, auto))
228    
229            # The title is constructed from the titles of the input tables.
230            self.assertEquals(table.Title(),
231                              "Join of %s and %s" % (landmarks.Title(),
232                                                     auto.Title()))
233    
234    
235        def test_transient_joined_table_same_column_name(self):
236            """Test TransientJoinedTable join on columns with same name
237    
238            The transient DB maps the column names used by the tables to
239            another set of names used only inside the SQLite database. There
240            was a bug in the way this mapping was used when joining on
241            fields with the same names in both tables so that the joined
242            table ended up joining on the same column in the same table.
243            """
244            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
245                                        [(i,) for i in range(4)])
246            stretches = AutoTransientTable(self.transientdb, mem_stretches)
247    
248            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
249                                          ("stretch_id", FIELDTYPE_INT)],
250                                         [(1, 0), (2, 3)])
251            discharges = AutoTransientTable(self.transientdb, mem_discharges)
252    
253            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
254                                         discharges, "stretch_id",
255                                         outer_join = True)
256    
257            self.assertEquals(table.NumRows(), 4)
258            self.assertEquals(table.NumColumns(), 3)
259    
260            # HasColumn
261            self.failUnless(table.HasColumn("stretch_id"))
262            self.failUnless(table.HasColumn("disch_id"))
263    
264    
265        def test_transient_joined_table_with_equal_column_names(self):
266            """Test TransientJoinedTable join on tables with equal column names
267    
268            If a name collision occurs for the field names, underscores are
269            appended as long as any collision is resolved.
270            """
271            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272                                         ("name", FIELDTYPE_INT)],
273                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
274            stretches = AutoTransientTable(self.transientdb, mem_stretches)
275    
276            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
277                                          ("stretch_id", FIELDTYPE_INT),
278                                          ("name", FIELDTYPE_INT)],
279                                         [(1, 0, 1), (2, 3, 2)])
280            discharges = AutoTransientTable(self.transientdb, mem_discharges)
281    
282            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
283                                         discharges, "stretch_id",
284                                         outer_join = True)
285    
286            self.assertEquals(table.NumRows(), 4)
287            self.assertEquals(table.NumColumns(), 5)
288    
289            # HasColumn
290            self.assertEquals([c.name for c in table.Columns()],
291                              ["stretch_id", "name", "disch_id", "stretch_id_",
292                               "name_"])
293    
294        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295            """Test TransientJoinedTable name-collisions do not modifying in place
296    
297            The name collision work-around by appending underscores
298            accidentally modified the column objects in place. We do two
299            joins therefore in reverse order to detect this: The first join
300            will lead to a modified name in the column object of the right
301            table which is then used as the left table in the second join so
302            the underscored name will appear before the non-underscored one
303            in the list of column names after the second join.
304            """
305            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306                                ("name", FIELDTYPE_INT)],
307                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
308            table1 = AutoTransientTable(self.transientdb, mem1)
309    
310            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311                                ("name", FIELDTYPE_INT)],
312                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
313            table2 = AutoTransientTable(self.transientdb, mem2)
314    
315            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316                                         table2, "stretch_id", outer_join = True)
317            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318                                         table1, "stretch_id", outer_join = True)
319    
320            self.assertEquals([c.name for c in table.Columns()],
321                              ["stretch_id", "name", "stretch_id_", "name_"])
322    
323      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
324          """Test TransientTable.ReadRowAsDict() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
# Line 239  class TestTransientTable(unittest.TestCa Line 379  class TestTransientTable(unittest.TestCa
379                            table.SimpleQuery,                            table.SimpleQuery,
380                            table.Column(1), "<<", table.Column(2))                            table.Column(1), "<<", table.Column(2))
381    
382        def test_transienttable_to_dbf(self):
383            memtable = MemoryTable([("type", FIELDTYPE_STRING),
384                                    ("value", FIELDTYPE_DOUBLE),
385                                    ("code", FIELDTYPE_INT)],
386                                   [("UNKNOWN", 0.0, 0),
387                                    ("Foo", 0.5, -1),
388                                    ("Foo", 1.0/256, 100),
389                                    ("bar", 1e10, 17)])
390            table = TransientTable(self.transientdb, memtable)
391            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392            table_to_dbf(table, filename)
393    
394            dbf = dbflib.DBFFile(filename)
395            self.assertEquals(dbf.read_record(2),
396                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397            self.assertEquals(dbf.field_count(), 3)
398            self.assertEquals(dbf.record_count(), 4)
399            self.assertEquals(dbf.field_info(0),
400                              (dbflib.FTString, "type", 7, 0))
401            self.assertEquals(dbf.field_info(1),
402                              (dbflib.FTDouble, "value", 24, 12))
403            self.assertEquals(dbf.field_info(2),
404                              (dbflib.FTInteger, "code", 3, 0))
405    
406    
407    
408  if __name__ == "__main__":  if __name__ == "__main__":
409      support.run_tests()      support.run_tests()

Legend:
Removed from v.849  
changed lines
  Added in v.1662

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26