/[thuban]/branches/WIP-pyshapelib-bramz/test/test_transientdb.py
ViewVC logotype

Diff of /branches/WIP-pyshapelib-bramz/test/test_transientdb.py

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 805 by jan, Fri May 2 16:42:35 2003 UTC revision 1332 by frank, Tue Jul 1 15:40:52 2003 UTC
# Line 20  import support Line 20  import support
20  support.initthuban()  support.initthuban()
21    
22  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \  from Thuban.Model.table import DBFTable, MemoryTable, FIELDTYPE_STRING, \
23                                 FIELDTYPE_INT                                 FIELDTYPE_INT, FIELDTYPE_DOUBLE
24  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \  from Thuban.Model.transientdb import TransientDatabase, TransientTable, \
25       TransientJoinedTable, AutoTransientTable       TransientJoinedTable, AutoTransientTable
26    
# Line 47  class TestTransientTable(unittest.TestCa Line 47  class TestTransientTable(unittest.TestCa
47          Assume that table holds the data of the file          Assume that table holds the data of the file
48          ../Data/iceland/political.dbf sample file.          ../Data/iceland/political.dbf sample file.
49          """          """
50          self.assertEquals(table.record_count(), 156)          self.assertEquals(table.NumRows(), 156)
51          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
52    
53          # Check one each of the possible field types. The width and          # Check one each of the possible field types.
54          # decimal precision is always 0.          columns = table.Columns()
55          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(columns[0].name, 'AREA')
56          self.assertEquals(table.field_info(3), ('int', 'PONET_ID', 0, 0))          self.assertEquals(columns[0].type, FIELDTYPE_DOUBLE)
57          self.assertEquals(table.field_info(6), ('string', 'POPYCOUN', 0, 0))          self.assertEquals(columns[3].name, 'PONET_ID')
58            self.assertEquals(columns[3].type, FIELDTYPE_INT)
59            self.assertEquals(columns[6].name, 'POPYCOUN')
60            self.assertEquals(columns[6].type, FIELDTYPE_STRING)
61    
62            # HasColumn
63            self.failUnless(table.HasColumn("AREA"))
64            self.failUnless(table.HasColumn(1))
65            # HasColumn for non-exisiting columns
66            self.failIf(table.HasColumn("non_existing_name"))
67            self.failIf(table.HasColumn(100))
68    
69          # Read an `interesting' record          # Reading rows and values.
70          self.assertEquals(table.read_record(144),          self.assertEquals(table.ReadRowAsDict(144),
71                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,                            {'POPYCOUN': 'IC', 'POPYADMIN': '', 'PONET_': 146,
72                             'AREA': 19.462,                             'AREA': 19.462,
73                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,                             'POPYTYPE': 1, 'PERIMETER': 88.518000000000001,
74                             'POPYREG': '1',                             'POPYREG': '1',
75                             'PONET_ID': 145})                             'PONET_ID': 145})
76            self.assertEquals(table.ReadValue(144, "AREA"), 19.462)
77            self.assertEquals(table.ReadValue(144, 3), 145)
78    
79          # field_range may induce a copy to the transient database.          # ValueRange may induce a copy to the transient database.
80          # Therefore we put it last so that we can execute this method          # Therefore we put it last so that we can execute this method
81          # twice to check whether the other methods still work after the          # twice to check whether the other methods still work after the
82          # copy          # copy
83          self.assertEquals(table.field_range("AREA"),          self.assertEquals(table.ValueRange("AREA"), (0.0, 19.462))
                           ((0.0, None), (19.462, None)))  
84    
85          unique = table.GetUniqueValues("PONET_ID")          unique = table.UniqueValues("PONET_ID")
86          unique.sort()          unique.sort()
87          self.assertEquals(unique, range(1, 157))          self.assertEquals(unique, range(1, 157))
88    
# Line 89  class TestTransientTable(unittest.TestCa Line 100  class TestTransientTable(unittest.TestCa
100          # The transient_table method should return the table itself          # The transient_table method should return the table itself
101          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
102    
103            # The title is simply copied over from the original table
104            self.assertEquals(table.Title(), orig_table.Title())
105    
106            # The TransientTable class itself doesn't implement the
107            # Dependencies method, so we don't test it.
108    
109    
110      def test_auto_transient_table(self):      def test_auto_transient_table(self):
111          """Test AutoTransientTable(dbftable)          """Test AutoTransientTable(dbftable)
# Line 108  class TestTransientTable(unittest.TestCa Line 125  class TestTransientTable(unittest.TestCa
125          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
126          self.run_iceland_political_tests(table)          self.run_iceland_political_tests(table)
127    
128        def test_auto_transient_table_query(self):
129            """Test AutoTransientTable.SimpleQuery()"""
130            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
131                                               "political.dbf"))
132            table = AutoTransientTable(self.transientdb, orig_table)
133            # Only a simple test here. The AutoTransientTable simply
134            # delegates to its transient table so it should be OK that the
135            # real test for it is in test_transient_table_query. However,
136            # it's important to check that the column handling works
137            # correctly because the AutoTransientTable and it's underlying
138            # transient table use different column object types.
139            self.assertEquals(table.SimpleQuery(table.Column("AREA"), ">", 10.0),
140                              [144])
141    
142            # test using a Column object as the right parameter
143            self.assertEquals(table.SimpleQuery(table.Column("POPYTYPE"),
144                                                "==",
145                                                table.Column("POPYREG")),
146                              range(156))
147    
148        def test_auto_transient_table_dependencies(self):
149            """Test AutoTransientTable.Dependencies()"""
150            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
151                                               "political.dbf"))
152            table = AutoTransientTable(self.transientdb, orig_table)
153            self.assertEquals(table.Dependencies(), (orig_table,))
154    
155        def test_auto_transient_table_title(self):
156            """Test AutoTransientTable.Title()"""
157            orig_table = DBFTable(os.path.join("..", "Data", "iceland",
158                                               "political.dbf"))
159            table = AutoTransientTable(self.transientdb, orig_table)
160            # The title is of course the same as that of the original table
161            self.assertEquals(table.Title(), orig_table.Title())
162    
163      def test_transient_joined_table(self):      def test_transient_joined_table(self):
164          """Test TransientJoinedTable"""          """Test TransientJoinedTable"""
# Line 127  class TestTransientTable(unittest.TestCa Line 178  class TestTransientTable(unittest.TestCa
178          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",          table = TransientJoinedTable(self.transientdb, landmarks, "CLPTLABEL",
179                                       auto, "type")                                       auto, "type")
180    
181          self.assertEquals(table.record_count(), 34)          self.assertEquals(table.NumRows(), 34)
182          self.assertEquals(table.field_count(), 8)          self.assertEquals(table.NumColumns(), 8)
183          self.assertEquals(table.field_info(0), ('double', 'AREA', 0, 0))          self.assertEquals(table.Column(0).type, FIELDTYPE_DOUBLE)
184          self.assertEquals(table.field_info(7), ('int', 'code', 0, 0))          self.assertEquals(table.Column(0).name, 'AREA')
185          self.assertEquals(table.field_info(4), ('string', 'CLPTLABEL', 0, 0))          self.assertEquals(table.Column(7).type, FIELDTYPE_INT)
186            self.assertEquals(table.Column(7).name, 'code')
187            self.assertEquals(table.Column(4).type, FIELDTYPE_STRING)
188            self.assertEquals(table.Column(4).name, 'CLPTLABEL')
189            # HasColumn
190            self.failUnless(table.HasColumn("AREA"))
191            self.failUnless(table.HasColumn(1))
192            # HasColumn for non-exisiting columns
193            self.failIf(table.HasColumn("non_existing_name"))
194            self.failIf(table.HasColumn(100))
195    
196          # Read an `interesting' record          # Reading rows and values
197          self.assertEquals(table.read_record(22),          self.assertEquals(table.ReadRowAsDict(22),
198                            {'PERIMETER': 0.0, 'CLPOINT_': 23,                            {'PERIMETER': 0.0, 'CLPOINT_': 23,
199                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',                             'AREA': 0.0, 'CLPTLABEL': 'RUINS',
200                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,                             'CLPOINT_ID': 38, 'CLPTFLAG': 0,
201                             'code': 1, 'type': 'RUINS'})                             'code': 1, 'type': 'RUINS'})
202            self.assertEquals(table.ReadValue(22, "type"), 'RUINS')
203            self.assertEquals(table.ReadValue(22, 7), 1)
204    
205          # The transient_table method should return the table itself          # The transient_table method should return the table itself
206          self.assert_(table is table.transient_table())          self.assert_(table is table.transient_table())
207    
208            # The TransientJoinedTable depends on both input tables
209            self.assertEquals(table.Dependencies(), (landmarks, auto))
210    
211            # The title is constructed from the titles of the input tables.
212            self.assertEquals(table.Title(),
213                              "Join of %s and %s" % (landmarks.Title(),
214                                                     auto.Title()))
215    
216    
217        def test_transient_joined_table_same_column_name(self):
218            """Test TransientJoinedTable join on columns with same name
219    
220            The transient DB maps the column names used by the tables to
221            another set of names used only inside the SQLite database. There
222            was a bug in the way this mapping was used when joining on
223            fields with the same names in both tables so that the joined
224            table ended up joining on the same column in the same table.
225            """
226            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT)],
227                                        [(i,) for i in range(4)])
228            stretches = AutoTransientTable(self.transientdb, mem_stretches)
229    
230            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
231                                          ("stretch_id", FIELDTYPE_INT)],
232                                         [(1, 0), (2, 3)])
233            discharges = AutoTransientTable(self.transientdb, mem_discharges)
234    
235            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
236                                         discharges, "stretch_id",
237                                         outer_join = True)
238    
239            self.assertEquals(table.NumRows(), 4)
240            self.assertEquals(table.NumColumns(), 2)
241    
242            # HasColumn
243            self.failUnless(table.HasColumn("stretch_id"))
244            self.failUnless(table.HasColumn("disch_id"))
245    
246    
247        def test_transient_joined_table_with_equal_column_names(self):
248            """Test TransientJoinedTable join on tables with equal column names
249    
250            The join of two tables contains all fields from both tables instead
251            th field the join was performed on. This special field is included
252            once. If a name collision occurs for the field names, underscores are
253            appended as long as any collision is resolved.      
254            """
255            mem_stretches = MemoryTable([("stretch_id", FIELDTYPE_INT),
256                                         ("name", FIELDTYPE_INT)],
257                                        [(0, 10), (1, 11), (2, 12), (3, 13) ])
258            stretches = AutoTransientTable(self.transientdb, mem_stretches)
259    
260            mem_discharges = MemoryTable([("disch_id", FIELDTYPE_INT),
261                                          ("stretch_id", FIELDTYPE_INT),
262                                          ("name", FIELDTYPE_INT)],
263                                         [(1, 0, 1), (2, 3, 2)])
264            discharges = AutoTransientTable(self.transientdb, mem_discharges)
265    
266            table = TransientJoinedTable(self.transientdb, stretches, "stretch_id",
267                                         discharges, "stretch_id",
268                                         outer_join = True)
269    
270            self.assertEquals(table.NumRows(), 4)
271            self.assertEquals(table.NumColumns(), 4)
272    
273            # HasColumn
274            self.failUnless(table.HasColumn("stretch_id"))
275            self.failUnless(table.HasColumn("disch_id"))
276            self.failUnless(table.HasColumn("name"))
277            self.failUnless(table.HasColumn("name_"))
278    
279    
280      def test_transient_table_read_twice(self):      def test_transient_table_read_twice(self):
281          """Test TransientTable.read_record() reading the same record twice"""          """Test TransientTable.ReadRowAsDict() reading the same record twice"""
282          simple = MemoryTable([("type", FIELDTYPE_STRING),          simple = MemoryTable([("type", FIELDTYPE_STRING),
283                                ("code", FIELDTYPE_INT)],                                ("code", FIELDTYPE_INT)],
284                               [("OTHER/UNKNOWN", 0),                               [("OTHER/UNKNOWN", 0),
# Line 161  class TestTransientTable(unittest.TestCa Line 294  class TestTransientTable(unittest.TestCa
294          # unitialized local variable, so for passing the test it's          # unitialized local variable, so for passing the test it's
295          # enough if reading simply succeeds. OTOH, while we're at it we          # enough if reading simply succeeds. OTOH, while we're at it we
296          # might as well check whether the results are equal anyway :)          # might as well check whether the results are equal anyway :)
297          result1 = table.read_record(3)          result1 = table.ReadRowAsDict(3)
298          result2 = table.read_record(3)          result2 = table.ReadRowAsDict(3)
299          self.assertEquals(result1, result2)          self.assertEquals(result1, result2)
300    
301    
302        def test_transient_table_query(self):
303            """Test TransientTable.SimpleQuery()"""
304            simple = MemoryTable([("type", FIELDTYPE_STRING),
305                                  ("value", FIELDTYPE_DOUBLE),
306                                  ("code", FIELDTYPE_INT)],
307                                 [("OTHER/UNKNOWN", -1.5, 11),
308                                  ("RUINS", 0.0, 1),
309                                  ("FARM", 3.141, 2),
310                                  ("BUILDING", 2.5, 3),
311                                  ("HUT", 1e6, 4),
312                                  ("LIGHTHOUSE", -0.01, 5)])
313            table = TransientTable(self.transientdb, simple)
314    
315            # A column and a value
316            self.assertEquals(table.SimpleQuery(table.Column(0), "==", "RUINS"),
317                              [1])
318            self.assertEquals(table.SimpleQuery(table.Column(2), "!=", 2),
319                              [0, 1, 3, 4, 5])
320            self.assertEquals(table.SimpleQuery(table.Column(1), "<", 1.0),
321                              [0, 1, 5])
322            self.assertEquals(table.SimpleQuery(table.Column(1), "<=", -1.5),
323                              [0])
324            self.assertEquals(table.SimpleQuery(table.Column(2), ">", 3),
325                              [0, 4, 5])
326            self.assertEquals(table.SimpleQuery(table.Column(2), ">=", 3),
327                              [0, 3, 4, 5])
328    
329            # Two columns as operands
330            self.assertEquals(table.SimpleQuery(table.Column(1),
331                                                "<=", table.Column(2)),
332                              [0, 1, 3, 5])
333    
334            # Test whether invalid operators raise a ValueError
335            self.assertRaises(ValueError,
336                              table.SimpleQuery,
337                              table.Column(1), "<<", table.Column(2))
338    
339    
340  if __name__ == "__main__":  if __name__ == "__main__":
341      support.run_tests()      support.run_tests()

Legend:
Removed from v.805  
changed lines
  Added in v.1332

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26