/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 765 by bh, Tue Apr 29 12:42:14 2003 UTC revision 1328 by bh, Tue Jul 1 12:01:58 2003 UTC
# Line 19  import unittest Line 19  import unittest
19  import support  import support
20  support.initthuban()  support.initthuban()
21    
22  from Thuban.Model.table import DBFTable, FIELDTYPE_STRING, FIELDTYPE_INT  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23                                   FIELDTYPE_INT, FIELDTYPE_DOUBLE
24  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
26    
27    
 class SimpleTable:  
   
     """Very simple table implementation that operates on a list of tuples"""  
   
     def __init__(self, fields, data):  
         """Initialize the SimpleTable  
   
         Parameters:  
         fields -- List of (name, field_type) pairs  
         data -- List of tuples, one for each row of data  
         """  
         self.fields = fields  
         self.data = data  
   
     def field_count(self):  
         return len(self.fields)  
   
     def field_info(self, index):  
         name, type = self.fields[index]  
         return (type, name, 0, 0)  
   
     def record_count(self):  
         return len(self.data)  
   
     def read_record(self, index):  
         return dict([(self.fields[i][0], self.data[index][i])  
                       for i in range(len(self.fields))])  
   
   
28  class TestTransientTable(unittest.TestCase, support.FileTestMixin):  class TestTransientTable(unittest.TestCase, support.FileTestMixin):
29    
30      def setUp(self):      def setUp(self):
# Line 75  class TestTransientTable(unittest.TestCa Line 47  class TestTransientTable(unittest.TestCa
47          Assume that table holds the data of the file          Assume that table holds the data of the file
48          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
49          """          """
50          self.assertEquals(table.record_count(), 156)          self.assertEquals(table.NumRows(), 156)
51          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
52    
53          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
54          # decimal precision is always 0.          columns = table.Columns()
55          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(columns[0].name, 'AREA')
56          self.assertEquals(table.field_info(3), ('int', 'PONET_ID', 0, 0))          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57          self.assertEquals(table.field_info(6), ('string', 'POPYCOUN', 0, 0))          self.assertEquals(columns[3].name, 'PONET_ID')
58            self.assertEquals(columns[3].type, FIELDTYPE_INT)
59            self.assertEquals(columns[6].name, 'POPYCOUN')
60            self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61    
62            # HasColumn
63            self.failUnless(table.HasColumn("AREA"))
64            self.failUnless(table.HasColumn(1))
65            # HasColumn for non-exisiting columns
66            self.failIf(table.HasColumn("non_existing_name"))
67            self.failIf(table.HasColumn(100))
68    
69          # Read an `interesting' record          # Reading rows and values.
70          self.assertEquals(table.read_record(144),          self.assertEquals(table.ReadRowAsDict(144),
71                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72                             'AREA': 19.462,                             'AREA': 19.462,
73                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74                             'POPYREG': '1',                             'POPYREG': '1',
75                             'PONET_ID': 145})                             'PONET_ID': 145})
76            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77            self.assertEquals(table.ReadValue(144, 3), 145)
78    
79          # field_range may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
80          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
81          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
82          # copy          # copy
83          self.assertEquals(table.field_range("AREA"),          self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
                           ((0.0, None), (19.462, None)))  
84    
85          unique = table.GetUniqueValues("PONET_ID")          unique = table.UniqueValues("PONET_ID")
86          unique.sort()          unique.sort()
87          self.assertEquals(unique, range(1, 157))          self.assertEquals(unique, range(1, 157))
88    
# Line 117  class TestTransientTable(unittest.TestCa Line 100  class TestTransientTable(unittest.TestCa
100          # The transient_table method should return the table itself          # The transient_table method should return the table itself
101          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
102    
103            # The title is simply copied over from the original table
104            self.assertEquals(table.Title(), orig_table.Title())
105    
106            # The TransientTable class itself doesn't implement the
107            # Dependencies method, so we don't test it.
108    
109    
110      def test_auto_transient_table(self):      def test_auto_transient_table(self):
111          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 136  class TestTransientTable(unittest.TestCa Line 125  class TestTransientTable(unittest.TestCa
125          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
126          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
127    
128        def test_auto_transient_table_query(self):
129            """Test AutoTransientTable.SimpleQuery()"""
130            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131                                               "political.dbf"))
132            table = AutoTransientTable(self.transientdb, orig_table)
133            # Only a simple test here. The AutoTransientTable simply
134            # delegates to its transient table so it should be OK that the
135            # real test for it is in test_transient_table_query. However,
136            # it's important to check that the column handling works
137            # correctly because the AutoTransientTable and it's underlying
138            # transient table use different column object types.
139            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140                              [144])
141    
142            # test using a Column object as the right parameter
143            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144                                                "==",
145                                                table.Column("POPYREG")),
146                              range(156))
147    
148        def test_auto_transient_table_dependencies(self):
149            """Test AutoTransientTable.Dependencies()"""
150            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151                                               "political.dbf"))
152            table = AutoTransientTable(self.transientdb, orig_table)
153            self.assertEquals(table.Dependencies(), (orig_table,))
154    
155        def test_auto_transient_table_title(self):
156            """Test AutoTransientTable.Title()"""
157            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158                                               "political.dbf"))
159            table = AutoTransientTable(self.transientdb, orig_table)
160            # The title is of course the same as that of the original table
161            self.assertEquals(table.Title(), orig_table.Title())
162    
163      def test_transient_joined_table(self):      def test_transient_joined_table(self):
164          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
165          simple = SimpleTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
166                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
167                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
168                                ("RUINS", 1),                                ("RUINS", 1),
# Line 155  class TestTransientTable(unittest.TestCa Line 178  class TestTransientTable(unittest.TestCa
178          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179                                       auto, "type")                                       auto, "type")
180    
181          self.assertEquals(table.record_count(), 34)          self.assertEquals(table.NumRows(), 34)
182          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
183          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
185          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186            self.assertEquals(table.Column(7).name, 'code')
187            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189            # HasColumn
190            self.failUnless(table.HasColumn("AREA"))
191            self.failUnless(table.HasColumn(1))
192            # HasColumn for non-exisiting columns
193            self.failIf(table.HasColumn("non_existing_name"))
194            self.failIf(table.HasColumn(100))
195    
196          # Read an `interesting' record          # Reading rows and values
197          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
198                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
199                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
202            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203            self.assertEquals(table.ReadValue(22, 7), 1)
204    
205          # The transient_table method should return the table itself          # The transient_table method should return the table itself
206          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
207    
208            # The TransientJoinedTable depends on both input tables
209            self.assertEquals(table.Dependencies(), (landmarks, auto))
210    
211            # The title is constructed from the titles of the input tables.
212            self.assertEquals(table.Title(),
213                              "Join of %s and %s" % (landmarks.Title(),
214                                                     auto.Title()))
215    
216    
217        def test_transient_joined_table_same_column_name(self):
218            """Test TransientJoinedTable join on columns with same name
219    
220            The transient DB maps the column names used by the tables to
221            another set of names used only inside the SQLite database. There
222            was a bug in the way this mapping was used when joining on
223            fields with the same names in both tables so that the joined
224            table ended up joining on the same column in the same table.
225            """
226            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227                                        [(i,) for i in range(4)])
228            stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231                                          ("stretch_id", FIELDTYPE_INT)],
232                                         [(1, 0), (2, 3)])
233            discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236                                         discharges, "stretch_id",
237                                         outer_join = True)
238    
239            self.assertEquals(table.NumRows(), 4)
240            self.assertEquals(table.NumColumns(), 2)
241    
242            # HasColumn
243            self.failUnless(table.HasColumn("stretch_id"))
244            self.failUnless(table.HasColumn("disch_id"))
245    
246    
247        def test_transient_table_read_twice(self):
248            """Test TransientTable.ReadRowAsDict() reading the same record twice"""
249            simple = MemoryTable([("type", FIELDTYPE_STRING),
250                                  ("code", FIELDTYPE_INT)],
251                                 [("OTHER/UNKNOWN", 0),
252                                  ("RUINS", 1),
253                                  ("FARM", 2),
254                                  ("BUILDING", 3),
255                                  ("HUT", 4),
256                                  ("LIGHTHOUSE", 5)])
257            table = TransientTable(self.transientdb, simple)
258    
259            # There was a bug where reading the same record twice would
260            # raise an exception in the second call because of an
261            # unitialized local variable, so for passing the test it's
262            # enough if reading simply succeeds. OTOH, while we're at it we
263            # might as well check whether the results are equal anyway :)
264            result1 = table.ReadRowAsDict(3)
265            result2 = table.ReadRowAsDict(3)
266            self.assertEquals(result1, result2)
267    
268    
269        def test_transient_table_query(self):
270            """Test TransientTable.SimpleQuery()"""
271            simple = MemoryTable([("type", FIELDTYPE_STRING),
272                                  ("value", FIELDTYPE_DOUBLE),
273                                  ("code", FIELDTYPE_INT)],
274                                 [("OTHER/UNKNOWN", -1.5, 11),
275                                  ("RUINS", 0.0, 1),
276                                  ("FARM", 3.141, 2),
277                                  ("BUILDING", 2.5, 3),
278                                  ("HUT", 1e6, 4),
279                                  ("LIGHTHOUSE", -0.01, 5)])
280            table = TransientTable(self.transientdb, simple)
281    
282            # A column and a value
283            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
284                              [1])
285            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
286                              [0, 1, 3, 4, 5])
287            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
288                              [0, 1, 5])
289            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
290                              [0])
291            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
292                              [0, 4, 5])
293            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
294                              [0, 3, 4, 5])
295    
296            # Two columns as operands
297            self.assertEquals(table.SimpleQuery(table.Column(1),
298                                                "<=", table.Column(2)),
299                              [0, 1, 3, 5])
300    
301            # Test whether invalid operators raise a ValueError
302            self.assertRaises(ValueError,
303                              table.SimpleQuery,
304                              table.Column(1), "<<", table.Column(2))
305    
306    
307  if __name__ == "__main__":  if __name__ == "__main__":

Legend:
Removed from v.765  
changed lines
  Added in v.1328

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26