/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 765 by bh, Tue Apr 29 12:42:14 2003 UTC revision 1662 by bh, Wed Aug 27 13:51:01 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22  from Thuban.Model.table import DBFTable, FIELDTYPE_STRING, FIELDTYPE_INT  import dbflib
23    from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
24                                   FIELDTYPE_INT, FIELDTYPE_DOUBLE, table_to_dbf
25  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
26       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
27    
28    
 class SimpleTable:  
   
     """Very simple table implementation that operates on a list of tuples"""  
   
     def __init__(self, fields, data):  
         """Initialize the SimpleTable  
   
         Parameters:  
         fields -- List of (name, field_type) pairs  
         data -- List of tuples, one for each row of data  
         """  
         self.fields = fields  
         self.data = data  
   
     def field_count(self):  
         return len(self.fields)  
   
     def field_info(self, index):  
         name, type = self.fields[index]  
         return (type, name, 0, 0)  
   
     def record_count(self):  
         return len(self.data)  
   
     def read_record(self, index):  
         return dict([(self.fields[i][0], self.data[index][i])  
                       for i in range(len(self.fields))])  
   
   
29  class TestTransientTable(unittest.TestCase, support.FileTestMixin):  class TestTransientTable(unittest.TestCase, support.FileTestMixin):
30    
31      def setUp(self):      def setUp(self):
# Line 75  class TestTransientTable(unittest.TestCa Line 48  class TestTransientTable(unittest.TestCa
48          Assume that table holds the data of the file          Assume that table holds the data of the file
49          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
50          """          """
51          self.assertEquals(table.record_count(), 156)          self.assertEquals(table.NumRows(), 156)
52          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
53    
54          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
55          # decimal precision is always 0.          columns = table.Columns()
56          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(columns[0].name, 'AREA')
57          self.assertEquals(table.field_info(3), ('int', 'PONET_ID', 0, 0))          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
58          self.assertEquals(table.field_info(6), ('string', 'POPYCOUN', 0, 0))          self.assertEquals(columns[3].name, 'PONET_ID')
59            self.assertEquals(columns[3].type, FIELDTYPE_INT)
60            self.assertEquals(columns[6].name, 'POPYCOUN')
61            self.assertEquals(columns[6].type, FIELDTYPE_STRING)
62    
63            # HasColumn
64            self.failUnless(table.HasColumn("AREA"))
65            self.failUnless(table.HasColumn(1))
66            # HasColumn for non-exisiting columns
67            self.failIf(table.HasColumn("non_existing_name"))
68            self.failIf(table.HasColumn(100))
69    
70          # Read an `interesting' record          # Reading rows and values.
71          self.assertEquals(table.read_record(144),          self.assertEquals(table.ReadRowAsDict(144),
72                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
73                             'AREA': 19.462,                             'AREA': 19.462,
74                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
75                             'POPYREG': '1',                             'POPYREG': '1',
76                             'PONET_ID': 145})                             'PONET_ID': 145})
77            self.assertEquals(table.ReadRowAsDict(144, row_is_ordinal = 1),
78                              {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
79                               'AREA': 19.462,
80                               'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
81                               'POPYREG': '1',
82                               'PONET_ID': 145})
83            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
84            self.assertEquals(table.ReadValue(144, 3), 145)
85            self.assertEquals(table.ReadValue(144, "AREA", row_is_ordinal = 1),
86                              19.462)
87            self.assertEquals(table.ReadValue(144, 3, row_is_ordinal = 1), 145)
88    
89            self.assertEquals(table.RowIdToOrdinal(23), 23)
90            self.assertEquals(table.RowOrdinalToId(23), 23)
91    
92          # field_range may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
93          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
94          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
95          # copy          # copy
96          self.assertEquals(table.field_range("AREA"),          self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
                           ((0.0, None), (19.462, None)))  
97    
98          unique = table.GetUniqueValues("PONET_ID")          unique = table.UniqueValues("PONET_ID")
99          unique.sort()          unique.sort()
100          self.assertEquals(unique, range(1, 157))          self.assertEquals(unique, range(1, 157))
101    
# Line 117  class TestTransientTable(unittest.TestCa Line 113  class TestTransientTable(unittest.TestCa
113          # The transient_table method should return the table itself          # The transient_table method should return the table itself
114          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
115    
116            # The title is simply copied over from the original table
117            self.assertEquals(table.Title(), orig_table.Title())
118    
119            # The TransientTable class itself doesn't implement the
120            # Dependencies method, so we don't test it.
121    
122    
123      def test_auto_transient_table(self):      def test_auto_transient_table(self):
124          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 136  class TestTransientTable(unittest.TestCa Line 138  class TestTransientTable(unittest.TestCa
138          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
139          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
140    
141        def test_auto_transient_table_query(self):
142            """Test AutoTransientTable.SimpleQuery()"""
143            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
144                                               "political.dbf"))
145            table = AutoTransientTable(self.transientdb, orig_table)
146            # Only a simple test here. The AutoTransientTable simply
147            # delegates to its transient table so it should be OK that the
148            # real test for it is in test_transient_table_query. However,
149            # it's important to check that the column handling works
150            # correctly because the AutoTransientTable and it's underlying
151            # transient table use different column object types.
152            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
153                              [144])
154    
155            # test using a Column object as the right parameter
156            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
157                                                "==",
158                                                table.Column("POPYREG")),
159                              range(156))
160    
161        def test_auto_transient_table_dependencies(self):
162            """Test AutoTransientTable.Dependencies()"""
163            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
164                                               "political.dbf"))
165            table = AutoTransientTable(self.transientdb, orig_table)
166            self.assertEquals(table.Dependencies(), (orig_table,))
167    
168        def test_auto_transient_table_title(self):
169            """Test AutoTransientTable.Title()"""
170            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
171                                               "political.dbf"))
172            table = AutoTransientTable(self.transientdb, orig_table)
173            # The title is of course the same as that of the original table
174            self.assertEquals(table.Title(), orig_table.Title())
175    
176      def test_transient_joined_table(self):      def test_transient_joined_table(self):
177          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
178          simple = SimpleTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
179                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
180                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
181                                ("RUINS", 1),                                ("RUINS", 1),
# Line 155  class TestTransientTable(unittest.TestCa Line 191  class TestTransientTable(unittest.TestCa
191          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
192                                       auto, "type")                                       auto, "type")
193    
194          self.assertEquals(table.record_count(), 34)          self.assertEquals(table.NumRows(), 34)
195          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
196          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
197          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
198          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
199            self.assertEquals(table.Column(7).name, 'code')
200            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
201            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
202            # HasColumn
203            self.failUnless(table.HasColumn("AREA"))
204            self.failUnless(table.HasColumn(1))
205            # HasColumn for non-exisiting columns
206            self.failIf(table.HasColumn("non_existing_name"))
207            self.failIf(table.HasColumn(100))
208    
209          # Read an `interesting' record          # Reading rows and values
210          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
211                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
212                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
213                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
214                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
215            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
216            self.assertEquals(table.ReadValue(22, 7), 1)
217            self.assertEquals(table.ReadValue(22, "type", row_is_ordinal = 1),
218                              "RUINS")
219            self.assertEquals(table.ReadValue(22, 7, row_is_ordinal = 1), 1)
220            self.assertEquals(table.RowIdToOrdinal(23), 23)
221            self.assertEquals(table.RowOrdinalToId(23), 23)
222    
223          # The transient_table method should return the table itself          # The transient_table method should return the table itself
224          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
225    
226            # The TransientJoinedTable depends on both input tables
227            self.assertEquals(table.Dependencies(), (landmarks, auto))
228    
229            # The title is constructed from the titles of the input tables.
230            self.assertEquals(table.Title(),
231                              "Join of %s and %s" % (landmarks.Title(),
232                                                     auto.Title()))
233    
234    
235        def test_transient_joined_table_same_column_name(self):
236            """Test TransientJoinedTable join on columns with same name
237    
238            The transient DB maps the column names used by the tables to
239            another set of names used only inside the SQLite database. There
240            was a bug in the way this mapping was used when joining on
241            fields with the same names in both tables so that the joined
242            table ended up joining on the same column in the same table.
243            """
244            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
245                                        [(i,) for i in range(4)])
246            stretches = AutoTransientTable(self.transientdb, mem_stretches)
247    
248            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
249                                          ("stretch_id", FIELDTYPE_INT)],
250                                         [(1, 0), (2, 3)])
251            discharges = AutoTransientTable(self.transientdb, mem_discharges)
252    
253            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
254                                         discharges, "stretch_id",
255                                         outer_join = True)
256    
257            self.assertEquals(table.NumRows(), 4)
258            self.assertEquals(table.NumColumns(), 3)
259    
260            # HasColumn
261            self.failUnless(table.HasColumn("stretch_id"))
262            self.failUnless(table.HasColumn("disch_id"))
263    
264    
265        def test_transient_joined_table_with_equal_column_names(self):
266            """Test TransientJoinedTable join on tables with equal column names
267    
268            If a name collision occurs for the field names, underscores are
269            appended as long as any collision is resolved.
270            """
271            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
272                                         ("name", FIELDTYPE_INT)],
273                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
274            stretches = AutoTransientTable(self.transientdb, mem_stretches)
275    
276            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
277                                          ("stretch_id", FIELDTYPE_INT),
278                                          ("name", FIELDTYPE_INT)],
279                                         [(1, 0, 1), (2, 3, 2)])
280            discharges = AutoTransientTable(self.transientdb, mem_discharges)
281    
282            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
283                                         discharges, "stretch_id",
284                                         outer_join = True)
285    
286            self.assertEquals(table.NumRows(), 4)
287            self.assertEquals(table.NumColumns(), 5)
288    
289            # HasColumn
290            self.assertEquals([c.name for c in table.Columns()],
291                              ["stretch_id", "name", "disch_id", "stretch_id_",
292                               "name_"])
293    
294        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
295            """Test TransientJoinedTable name-collisions do not modifying in place
296    
297            The name collision work-around by appending underscores
298            accidentally modified the column objects in place. We do two
299            joins therefore in reverse order to detect this: The first join
300            will lead to a modified name in the column object of the right
301            table which is then used as the left table in the second join so
302            the underscored name will appear before the non-underscored one
303            in the list of column names after the second join.
304            """
305            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
306                                ("name", FIELDTYPE_INT)],
307                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
308            table1 = AutoTransientTable(self.transientdb, mem1)
309    
310            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
311                                ("name", FIELDTYPE_INT)],
312                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
313            table2 = AutoTransientTable(self.transientdb, mem2)
314    
315            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
316                                         table2, "stretch_id", outer_join = True)
317            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
318                                         table1, "stretch_id", outer_join = True)
319    
320            self.assertEquals([c.name for c in table.Columns()],
321                              ["stretch_id", "name", "stretch_id_", "name_"])
322    
323        def test_transient_table_read_twice(self):
324            """Test TransientTable.ReadRowAsDict() reading the same record twice"""
325            simple = MemoryTable([("type", FIELDTYPE_STRING),
326                                  ("code", FIELDTYPE_INT)],
327                                 [("OTHER/UNKNOWN", 0),
328                                  ("RUINS", 1),
329                                  ("FARM", 2),
330                                  ("BUILDING", 3),
331                                  ("HUT", 4),
332                                  ("LIGHTHOUSE", 5)])
333            table = TransientTable(self.transientdb, simple)
334    
335            # There was a bug where reading the same record twice would
336            # raise an exception in the second call because of an
337            # unitialized local variable, so for passing the test it's
338            # enough if reading simply succeeds. OTOH, while we're at it we
339            # might as well check whether the results are equal anyway :)
340            result1 = table.ReadRowAsDict(3)
341            result2 = table.ReadRowAsDict(3)
342            self.assertEquals(result1, result2)
343    
344    
345        def test_transient_table_query(self):
346            """Test TransientTable.SimpleQuery()"""
347            simple = MemoryTable([("type", FIELDTYPE_STRING),
348                                  ("value", FIELDTYPE_DOUBLE),
349                                  ("code", FIELDTYPE_INT)],
350                                 [("OTHER/UNKNOWN", -1.5, 11),
351                                  ("RUINS", 0.0, 1),
352                                  ("FARM", 3.141, 2),
353                                  ("BUILDING", 2.5, 3),
354                                  ("HUT", 1e6, 4),
355                                  ("LIGHTHOUSE", -0.01, 5)])
356            table = TransientTable(self.transientdb, simple)
357    
358            # A column and a value
359            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
360                              [1])
361            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
362                              [0, 1, 3, 4, 5])
363            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
364                              [0, 1, 5])
365            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
366                              [0])
367            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
368                              [0, 4, 5])
369            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
370                              [0, 3, 4, 5])
371    
372            # Two columns as operands
373            self.assertEquals(table.SimpleQuery(table.Column(1),
374                                                "<=", table.Column(2)),
375                              [0, 1, 3, 5])
376    
377            # Test whether invalid operators raise a ValueError
378            self.assertRaises(ValueError,
379                              table.SimpleQuery,
380                              table.Column(1), "<<", table.Column(2))
381    
382        def test_transienttable_to_dbf(self):
383            memtable = MemoryTable([("type", FIELDTYPE_STRING),
384                                    ("value", FIELDTYPE_DOUBLE),
385                                    ("code", FIELDTYPE_INT)],
386                                   [("UNKNOWN", 0.0, 0),
387                                    ("Foo", 0.5, -1),
388                                    ("Foo", 1.0/256, 100),
389                                    ("bar", 1e10, 17)])
390            table = TransientTable(self.transientdb, memtable)
391            filename = self.temp_file_name("test_transienttable_to_dbf.dbf")
392            table_to_dbf(table, filename)
393    
394            dbf = dbflib.DBFFile(filename)
395            self.assertEquals(dbf.read_record(2),
396                              {'code': 100, 'type': 'Foo', 'value': 0.00390625})
397            self.assertEquals(dbf.field_count(), 3)
398            self.assertEquals(dbf.record_count(), 4)
399            self.assertEquals(dbf.field_info(0),
400                              (dbflib.FTString, "type", 7, 0))
401            self.assertEquals(dbf.field_info(1),
402                              (dbflib.FTDouble, "value", 24, 12))
403            self.assertEquals(dbf.field_info(2),
404                              (dbflib.FTInteger, "code", 3, 0))
405    
406    
407    
408  if __name__ == "__main__":  if __name__ == "__main__":

Legend:
Removed from v.765  
changed lines
  Added in v.1662

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26