/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 765 by bh, Tue Apr 29 12:42:14 2003 UTC revision 1364 by bh, Thu Jul 3 13:09:43 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22  from Thuban.Model.table import DBFTable, FIELDTYPE_STRING, FIELDTYPE_INT  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23                                   FIELDTYPE_INT, FIELDTYPE_DOUBLE
24  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
26    
27    
 class SimpleTable:  
   
     """Very simple table implementation that operates on a list of tuples"""  
   
     def __init__(self, fields, data):  
         """Initialize the SimpleTable  
   
         Parameters:  
         fields -- List of (name, field_type) pairs  
         data -- List of tuples, one for each row of data  
         """  
         self.fields = fields  
         self.data = data  
   
     def field_count(self):  
         return len(self.fields)  
   
     def field_info(self, index):  
         name, type = self.fields[index]  
         return (type, name, 0, 0)  
   
     def record_count(self):  
         return len(self.data)  
   
     def read_record(self, index):  
         return dict([(self.fields[i][0], self.data[index][i])  
                       for i in range(len(self.fields))])  
   
   
28  class TestTransientTable(unittest.TestCase, support.FileTestMixin):  class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29    
30      def setUp(self):      def setUp(self):
# Line 75  class TestTransientTable(unittest.TestCa Line 47  class TestTransientTable(unittest.TestCa
47          Assume that table holds the data of the file          Assume that table holds the data of the file
48          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
49          """          """
50          self.assertEquals(table.record_count(), 156)          self.assertEquals(table.NumRows(), 156)
51          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
52    
53          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
54          # decimal precision is always 0.          columns = table.Columns()
55          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(columns[0].name, 'AREA')
56          self.assertEquals(table.field_info(3), ('int', 'PONET_ID', 0, 0))          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57          self.assertEquals(table.field_info(6), ('string', 'POPYCOUN', 0, 0))          self.assertEquals(columns[3].name, 'PONET_ID')
58            self.assertEquals(columns[3].type, FIELDTYPE_INT)
59            self.assertEquals(columns[6].name, 'POPYCOUN')
60            self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61    
62            # HasColumn
63            self.failUnless(table.HasColumn("AREA"))
64            self.failUnless(table.HasColumn(1))
65            # HasColumn for non-exisiting columns
66            self.failIf(table.HasColumn("non_existing_name"))
67            self.failIf(table.HasColumn(100))
68    
69          # Read an `interesting' record          # Reading rows and values.
70          self.assertEquals(table.read_record(144),          self.assertEquals(table.ReadRowAsDict(144),
71                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72                             'AREA': 19.462,                             'AREA': 19.462,
73                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74                             'POPYREG': '1',                             'POPYREG': '1',
75                             'PONET_ID': 145})                             'PONET_ID': 145})
76            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77            self.assertEquals(table.ReadValue(144, 3), 145)
78    
79          # field_range may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
80          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
81          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
82          # copy          # copy
83          self.assertEquals(table.field_range("AREA"),          self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
                           ((0.0, None), (19.462, None)))  
84    
85          unique = table.GetUniqueValues("PONET_ID")          unique = table.UniqueValues("PONET_ID")
86          unique.sort()          unique.sort()
87          self.assertEquals(unique, range(1, 157))          self.assertEquals(unique, range(1, 157))
88    
# Line 117  class TestTransientTable(unittest.TestCa Line 100  class TestTransientTable(unittest.TestCa
100          # The transient_table method should return the table itself          # The transient_table method should return the table itself
101          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
102    
103            # The title is simply copied over from the original table
104            self.assertEquals(table.Title(), orig_table.Title())
105    
106            # The TransientTable class itself doesn't implement the
107            # Dependencies method, so we don't test it.
108    
109    
110      def test_auto_transient_table(self):      def test_auto_transient_table(self):
111          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 136  class TestTransientTable(unittest.TestCa Line 125  class TestTransientTable(unittest.TestCa
125          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
126          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
127    
128        def test_auto_transient_table_query(self):
129            """Test AutoTransientTable.SimpleQuery()"""
130            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131                                               "political.dbf"))
132            table = AutoTransientTable(self.transientdb, orig_table)
133            # Only a simple test here. The AutoTransientTable simply
134            # delegates to its transient table so it should be OK that the
135            # real test for it is in test_transient_table_query. However,
136            # it's important to check that the column handling works
137            # correctly because the AutoTransientTable and it's underlying
138            # transient table use different column object types.
139            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140                              [144])
141    
142            # test using a Column object as the right parameter
143            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144                                                "==",
145                                                table.Column("POPYREG")),
146                              range(156))
147    
148        def test_auto_transient_table_dependencies(self):
149            """Test AutoTransientTable.Dependencies()"""
150            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151                                               "political.dbf"))
152            table = AutoTransientTable(self.transientdb, orig_table)
153            self.assertEquals(table.Dependencies(), (orig_table,))
154    
155        def test_auto_transient_table_title(self):
156            """Test AutoTransientTable.Title()"""
157            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158                                               "political.dbf"))
159            table = AutoTransientTable(self.transientdb, orig_table)
160            # The title is of course the same as that of the original table
161            self.assertEquals(table.Title(), orig_table.Title())
162    
163      def test_transient_joined_table(self):      def test_transient_joined_table(self):
164          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
165          simple = SimpleTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
166                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
167                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
168                                ("RUINS", 1),                                ("RUINS", 1),
# Line 155  class TestTransientTable(unittest.TestCa Line 178  class TestTransientTable(unittest.TestCa
178          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179                                       auto, "type")                                       auto, "type")
180    
181          self.assertEquals(table.record_count(), 34)          self.assertEquals(table.NumRows(), 34)
182          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
183          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
185          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186            self.assertEquals(table.Column(7).name, 'code')
187            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189            # HasColumn
190            self.failUnless(table.HasColumn("AREA"))
191            self.failUnless(table.HasColumn(1))
192            # HasColumn for non-exisiting columns
193            self.failIf(table.HasColumn("non_existing_name"))
194            self.failIf(table.HasColumn(100))
195    
196          # Read an `interesting' record          # Reading rows and values
197          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
198                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
199                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
202            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203            self.assertEquals(table.ReadValue(22, 7), 1)
204    
205          # The transient_table method should return the table itself          # The transient_table method should return the table itself
206          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
207    
208            # The TransientJoinedTable depends on both input tables
209            self.assertEquals(table.Dependencies(), (landmarks, auto))
210    
211            # The title is constructed from the titles of the input tables.
212            self.assertEquals(table.Title(),
213                              "Join of %s and %s" % (landmarks.Title(),
214                                                     auto.Title()))
215    
216    
217        def test_transient_joined_table_same_column_name(self):
218            """Test TransientJoinedTable join on columns with same name
219    
220            The transient DB maps the column names used by the tables to
221            another set of names used only inside the SQLite database. There
222            was a bug in the way this mapping was used when joining on
223            fields with the same names in both tables so that the joined
224            table ended up joining on the same column in the same table.
225            """
226            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227                                        [(i,) for i in range(4)])
228            stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231                                          ("stretch_id", FIELDTYPE_INT)],
232                                         [(1, 0), (2, 3)])
233            discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236                                         discharges, "stretch_id",
237                                         outer_join = True)
238    
239            self.assertEquals(table.NumRows(), 4)
240            self.assertEquals(table.NumColumns(), 3)
241    
242            # HasColumn
243            self.failUnless(table.HasColumn("stretch_id"))
244            self.failUnless(table.HasColumn("disch_id"))
245    
246    
247        def test_transient_joined_table_with_equal_column_names(self):
248            """Test TransientJoinedTable join on tables with equal column names
249    
250            If a name collision occurs for the field names, underscores are
251            appended as long as any collision is resolved.
252            """
253            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
254                                         ("name", FIELDTYPE_INT)],
255                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
256            stretches = AutoTransientTable(self.transientdb, mem_stretches)
257    
258            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
259                                          ("stretch_id", FIELDTYPE_INT),
260                                          ("name", FIELDTYPE_INT)],
261                                         [(1, 0, 1), (2, 3, 2)])
262            discharges = AutoTransientTable(self.transientdb, mem_discharges)
263    
264            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
265                                         discharges, "stretch_id",
266                                         outer_join = True)
267    
268            self.assertEquals(table.NumRows(), 4)
269            self.assertEquals(table.NumColumns(), 5)
270    
271            # HasColumn
272            self.assertEquals([c.name for c in table.Columns()],
273                              ["stretch_id", "name", "disch_id", "stretch_id_",
274                               "name_"])
275    
276        def test_transient_joined_table_name_collisions_dont_modify_in_place(self):
277            """Test TransientJoinedTable name-collisions do not modifying in place
278    
279            The name collision work-around by appending underscores
280            accidentally modified the column objects in place. We do two
281            joins therefore in reverse order to detect this: The first join
282            will lead to a modified name in the column object of the right
283            table which is then used as the left table in the second join so
284            the underscored name will appear before the non-underscored one
285            in the list of column names after the second join.
286            """
287            mem1 = MemoryTable([("stretch_id", FIELDTYPE_INT),
288                                ("name", FIELDTYPE_INT)],
289                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
290            table1 = AutoTransientTable(self.transientdb, mem1)
291    
292            mem2 = MemoryTable([("stretch_id", FIELDTYPE_INT),
293                                ("name", FIELDTYPE_INT)],
294                               [(0, 10), (1, 11), (2, 12), (3, 13) ])
295            table2 = AutoTransientTable(self.transientdb, mem2)
296    
297            table = TransientJoinedTable(self.transientdb, table1, "stretch_id",
298                                         table2, "stretch_id", outer_join = True)
299            table = TransientJoinedTable(self.transientdb, table2, "stretch_id",
300                                         table1, "stretch_id", outer_join = True)
301    
302            self.assertEquals([c.name for c in table.Columns()],
303                              ["stretch_id", "name", "stretch_id_", "name_"])
304    
305        def test_transient_table_read_twice(self):
306            """Test TransientTable.ReadRowAsDict() reading the same record twice"""
307            simple = MemoryTable([("type", FIELDTYPE_STRING),
308                                  ("code", FIELDTYPE_INT)],
309                                 [("OTHER/UNKNOWN", 0),
310                                  ("RUINS", 1),
311                                  ("FARM", 2),
312                                  ("BUILDING", 3),
313                                  ("HUT", 4),
314                                  ("LIGHTHOUSE", 5)])
315            table = TransientTable(self.transientdb, simple)
316    
317            # There was a bug where reading the same record twice would
318            # raise an exception in the second call because of an
319            # unitialized local variable, so for passing the test it's
320            # enough if reading simply succeeds. OTOH, while we're at it we
321            # might as well check whether the results are equal anyway :)
322            result1 = table.ReadRowAsDict(3)
323            result2 = table.ReadRowAsDict(3)
324            self.assertEquals(result1, result2)
325    
326    
327        def test_transient_table_query(self):
328            """Test TransientTable.SimpleQuery()"""
329            simple = MemoryTable([("type", FIELDTYPE_STRING),
330                                  ("value", FIELDTYPE_DOUBLE),
331                                  ("code", FIELDTYPE_INT)],
332                                 [("OTHER/UNKNOWN", -1.5, 11),
333                                  ("RUINS", 0.0, 1),
334                                  ("FARM", 3.141, 2),
335                                  ("BUILDING", 2.5, 3),
336                                  ("HUT", 1e6, 4),
337                                  ("LIGHTHOUSE", -0.01, 5)])
338            table = TransientTable(self.transientdb, simple)
339    
340            # A column and a value
341            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
342                              [1])
343            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
344                              [0, 1, 3, 4, 5])
345            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
346                              [0, 1, 5])
347            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
348                              [0])
349            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
350                              [0, 4, 5])
351            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
352                              [0, 3, 4, 5])
353    
354            # Two columns as operands
355            self.assertEquals(table.SimpleQuery(table.Column(1),
356                                                "<=", table.Column(2)),
357                              [0, 1, 3, 5])
358    
359            # Test whether invalid operators raise a ValueError
360            self.assertRaises(ValueError,
361                              table.SimpleQuery,
362                              table.Column(1), "<<", table.Column(2))
363    
364    
365  if __name__ == "__main__":  if __name__ == "__main__":

Legend:
Removed from v.765  
changed lines
  Added in v.1364

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26