/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 818 by bh, Mon May 5 17:18:31 2003 UTC revision 1381 by bh, Tue Jul 8 16:37:46 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22    import dbflib
23  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                 FIELDTYPE_INT                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
# Line 50  class TestTransientTable(unittest.TestCa Line 51  class TestTransientTable(unittest.TestCa
51          self.assertEquals(table.NumRows(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
         # decimal precision is always 0.  
55          columns = table.Columns()          columns = table.Columns()
56          self.assertEquals(columns[0].name, 'AREA')          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(columns[0].type, 'double')          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58          self.assertEquals(columns[3].name, 'PONET_ID')          self.assertEquals(columns[3].name, 'PONET_ID')
59          self.assertEquals(columns[3].type, 'int')          self.assertEquals(columns[3].type, FIELDTYPE_INT)
60          self.assertEquals(columns[6].name, 'POPYCOUN')          self.assertEquals(columns[6].name, 'POPYCOUN')
61          self.assertEquals(columns[6].type, 'string')          self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62    
63          # Read an `interesting' record          # HasColumn
64            self.failUnless(table.HasColumn("AREA"))
65            self.failUnless(table.HasColumn(1))
66            # HasColumn for non-exisiting columns
67            self.failIf(table.HasColumn("non_existing_name"))
68            self.failIf(table.HasColumn(100))
69    
70            # Reading rows and values.
71          self.assertEquals(table.ReadRowAsDict(144),          self.assertEquals(table.ReadRowAsDict(144),
72                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73                             'AREA': 19.462,                             'AREA': 19.462,
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
78            self.assertEquals(table.ReadValue(144, 3), 145)
79    
80          # field_range may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
81          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
82          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
83          # copy          # copy
# Line 92  class TestTransientTable(unittest.TestCa Line 101  class TestTransientTable(unittest.TestCa
101          # The transient_table method should return the table itself          # The transient_table method should return the table itself
102          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
103    
104            # The title is simply copied over from the original table
105            self.assertEquals(table.Title(), orig_table.Title())
106    
107            # The TransientTable class itself doesn't implement the
108            # Dependencies method, so we don't test it.
109    
110    
111      def test_auto_transient_table(self):      def test_auto_transient_table(self):
112          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 111  class TestTransientTable(unittest.TestCa Line 126  class TestTransientTable(unittest.TestCa
126          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
127          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
128    
129        def test_auto_transient_table_query(self):
130            """Test AutoTransientTable.SimpleQuery()"""
131            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
132                                               "political.dbf"))
133            table = AutoTransientTable(self.transientdb, orig_table)
134            # Only a simple test here. The AutoTransientTable simply
135            # delegates to its transient table so it should be OK that the
136            # real test for it is in test_transient_table_query. However,
137            # it's important to check that the column handling works
138            # correctly because the AutoTransientTable and it's underlying
139            # transient table use different column object types.
140            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
141                              [144])
142    
143            # test using a Column object as the right parameter
144            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
145                                                "==",
146                                                table.Column("POPYREG")),
147                              range(156))
148    
149        def test_auto_transient_table_dependencies(self):
150            """Test AutoTransientTable.Dependencies()"""
151            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
152                                               "political.dbf"))
153            table = AutoTransientTable(self.transientdb, orig_table)
154            self.assertEquals(table.Dependencies(), (orig_table,))
155    
156        def test_auto_transient_table_title(self):
157            """Test AutoTransientTable.Title()"""
158            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
159                                               "political.dbf"))
160            table = AutoTransientTable(self.transientdb, orig_table)
161            # The title is of course the same as that of the original table
162            self.assertEquals(table.Title(), orig_table.Title())
163    
164      def test_transient_joined_table(self):      def test_transient_joined_table(self):
165          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
# Line 132  class TestTransientTable(unittest.TestCa Line 181  class TestTransientTable(unittest.TestCa
181    
182          self.assertEquals(table.NumRows(), 34)          self.assertEquals(table.NumRows(), 34)
183          self.assertEquals(table.NumColumns(), 8)          self.assertEquals(table.NumColumns(), 8)
184          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
185          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
186          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
187            self.assertEquals(table.Column(7).name, 'code')
188            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
189            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
190            # HasColumn
191            self.failUnless(table.HasColumn("AREA"))
192            self.failUnless(table.HasColumn(1))
193            # HasColumn for non-exisiting columns
194            self.failIf(table.HasColumn("non_existing_name"))
195            self.failIf(table.HasColumn(100))
196    
197          # Read an `interesting' record          # Reading rows and values
198          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
199                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
200                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
201                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
202                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
203            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
204            self.assertEquals(table.ReadValue(22, 7), 1)
205    
206          # The transient_table method should return the table itself          # The transient_table method should return the table itself
207          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
208    
209            # The TransientJoinedTable depends on both input tables
210            self.assertEquals(table.Dependencies(), (landmarks, auto))
211    
212            # The title is constructed from the titles of the input tables.
213            self.assertEquals(table.Title(),
214                              "Join of %s and %s" % (landmarks.Title(),
215                                                     auto.Title()))
216    
217    
218        def test_transient_joined_table_same_column_name(self):
219            """Test TransientJoinedTable join on columns with same name
220    
221            The transient DB maps the column names used by the tables to
222            another set of names used only inside the SQLite database. There
223            was a bug in the way this mapping was used when joining on
224            fields with the same names in both tables so that the joined
225            table ended up joining on the same column in the same table.
226            """
227            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
228                                        [(i,) for i in range(4)])
229            stretches = AutoTransientTable(self.transientdb, mem_stretches)
230    
231            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
232                                          ("stretch_id", FIELDTYPE_INT)],
233                                         [(1, 0), (2, 3)])
234            discharges = AutoTransientTable(self.transientdb, mem_discharges)
235    
236            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
237                                         discharges, "stretch_id",
238                                         outer_join = True)
239    
240            self.assertEquals(table.NumRows(), 4)
241            self.assertEquals(table.NumColumns(), 3)
242    
243            # HasColumn
244            self.failUnless(table.HasColumn("stretch_id"))
245            self.failUnless(table.HasColumn("disch_id"))
246    
247    
248        def test_transient_joined_table_with_equal_column_names(self):
249            """Test TransientJoinedTable join on tables with equal column names
250    
251            If a name collision occurs for the field names, underscores are
252            appended as long as any collision is resolved.
253            """
254            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
255                                         ("name", FIELDTYPE_INT)],
256                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
257            stretches = AutoTransientTable(self.transientdb, mem_stretches)
258    
259            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
260                                          ("stretch_id", FIELDTYPE_INT),
261                                          ("name", FIELDTYPE_INT)],
262                                         [(1, 0, 1), (2, 3, 2)])
263            discharges = AutoTransientTable(self.transientdb, mem_discharges)
264    
265            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
266                                         discharges, "stretch_id",
267                                         outer_join = True)
268    
269            self.assertEquals(table.NumRows(), 4)
270            self.assertEquals(table.NumColumns(), 5)
271    
272            # HasColumn
273            self.assertEquals([c.name for c in table.Columns()],
274                              ["stretch_id", "name", "disch_id", "stretch_id_",
275                               "name_"])
276    
277        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
278            """Test TransientJoinedTable name-collisions do not modifying in place
279    
280            The name collision work-around by appending underscores
281            accidentally modified the column objects in place. We do two
282            joins therefore in reverse order to detect this: The first join
283            will lead to a modified name in the column object of the right
284            table which is then used as the left table in the second join so
285            the underscored name will appear before the non-underscored one
286            in the list of column names after the second join.
287            """
288            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
289                                ("name", FIELDTYPE_INT)],
290                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
291            table1 = AutoTransientTable(self.transientdb, mem1)
292    
293            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
294                                ("name", FIELDTYPE_INT)],
295                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
296            table2 = AutoTransientTable(self.transientdb, mem2)
297    
298            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
299                                         table2, "stretch_id", outer_join = True)
300            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
301                                         table1, "stretch_id", outer_join = True)
302    
303            self.assertEquals([c.name for c in table.Columns()],
304                              ["stretch_id", "name", "stretch_id_", "name_"])
305    
306      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
307          """Test TransientTable.read_record() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
308          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
309                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
310                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
# Line 164  class TestTransientTable(unittest.TestCa Line 320  class TestTransientTable(unittest.TestCa
320          # unitialized local variable, so for passing the test it's          # unitialized local variable, so for passing the test it's
321          # enough if reading simply succeeds. OTOH, while we're at it we          # enough if reading simply succeeds. OTOH, while we're at it we
322          # might as well check whether the results are equal anyway :)          # might as well check whether the results are equal anyway :)
323          result1 = table.read_record(3)          result1 = table.ReadRowAsDict(3)
324          result2 = table.read_record(3)          result2 = table.ReadRowAsDict(3)
325          self.assertEquals(result1, result2)          self.assertEquals(result1, result2)
326    
327    
328        def test_transient_table_query(self):
329            """Test TransientTable.SimpleQuery()"""
330            simple = MemoryTable([("type", FIELDTYPE_STRING),
331                                  ("value", FIELDTYPE_DOUBLE),
332                                  ("code", FIELDTYPE_INT)],
333                                 [("OTHER/UNKNOWN", -1.5, 11),
334                                  ("RUINS", 0.0, 1),
335                                  ("FARM", 3.141, 2),
336                                  ("BUILDING", 2.5, 3),
337                                  ("HUT", 1e6, 4),
338                                  ("LIGHTHOUSE", -0.01, 5)])
339            table = TransientTable(self.transientdb, simple)
340    
341            # A column and a value
342            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
343                              [1])
344            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
345                              [0, 1, 3, 4, 5])
346            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
347                              [0, 1, 5])
348            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
349                              [0])
350            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
351                              [0, 4, 5])
352            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
353                              [0, 3, 4, 5])
354    
355            # Two columns as operands
356            self.assertEquals(table.SimpleQuery(table.Column(1),
357                                                "<=", table.Column(2)),
358                              [0, 1, 3, 5])
359    
360            # Test whether invalid operators raise a ValueError
361            self.assertRaises(ValueError,
362                              table.SimpleQuery,
363                              table.Column(1), "<<", table.Column(2))
364    
365        def test_transienttable_to_dbf(self):
366            memtable = MemoryTable([("type", FIELDTYPE_STRING),
367                                    ("value", FIELDTYPE_DOUBLE),
368                                    ("code", FIELDTYPE_INT)],
369                                   [("UNKNOWN", 0.0, 0),
370                                    ("Foo", 0.5, -1),
371                                    ("Foo", 1.0/256, 100),
372                                    ("bar", 1e10, 17)])
373            table = TransientTable(self.transientdb, memtable)
374            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
375            table_to_dbf(table, filename)
376    
377            dbf = dbflib.DBFFile(filename)
378            self.assertEquals(dbf.read_record(2),
379                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
380            self.assertEquals(dbf.field_count(), 3)
381            self.assertEquals(dbf.record_count(), 4)
382            self.assertEquals(dbf.field_info(0),
383                              (dbflib.FTString, "type", 7, 0))
384            self.assertEquals(dbf.field_info(1),
385                              (dbflib.FTDouble, "value", 24, 12))
386            self.assertEquals(dbf.field_info(2),
387                              (dbflib.FTInteger, "code", 3, 0))
388    
389    
390    
391  if __name__ == "__main__":  if __name__ == "__main__":
392      support.run_tests()      support.run_tests()

Legend:
Removed from v.818  
changed lines
  Added in v.1381

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26