/[thuban]/branches/WIP-pyshapelib-bramz/test/test_connector.py
ViewVC logotype

Annotation of /branches/WIP-pyshapelib-bramz/test/test_connector.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2734 - (hide annotations)
Thu Mar 1 12:42:59 2007 UTC (18 years ago) by bramz
File MIME type: text/x-python
File size: 13942 byte(s)
made a copy
1 bh 2586 # Copyright (c) 2002, 2003, 2004 by Intevation GmbH
2 bh 329 # Authors:
3     # Bernhard Herzog <[email protected]>
4     #
5     # This program is free software under the GPL (>=v2)
6     # Read the file COPYING coming with Thuban for details.
7    
8     """
9     Test the Connector class
10     """
11    
12     __version__ = "$Revision$"
13     # $Source$
14     # $Id$
15    
16 bh 1953 import sys
17 bh 329 import unittest
18 bh 1953 import traceback
19 bh 329
20     import support
21     support.initthuban()
22    
23 bh 1953 from Thuban.Lib.connector import Connector, Publisher, Conduit, ConnectorError
24 bh 329
25     # some messages used in the tests
26     SIMPLE = "SIMPLE"
27     PARAM = "PARAM"
28    
29     class SimplePublisher:
30    
31     """A version of Publisher that uses a specific connector.
32    
33     The Publisher class in Thuban.Lib.connector uses the global
34     connector in the same module.
35     """
36    
37     def __init__(self, connector):
38     self.connector = connector
39    
40     def __del__(self):
41     self.connector.RemovePublisher(self)
42    
43     def issue(self):
44     """Issue a SIMPLE message without parameters"""
45     self.connector.Issue(self, SIMPLE)
46    
47     def issue_arg(self):
48     """Issue a PARAM message with 42 as parameter"""
49     self.connector.Issue(self, PARAM, 42)
50    
51    
52     class RealPublisher(Publisher):
53    
54     """Extended version of Publisher for testing purposes.
55    
56     Publisher is not intended to be used directly. It is used as a base
57     class for objects that send messages when they change. So we do just
58     that here and derive from Publisher to provide some simple methods
59     that issue messages.
60     """
61    
62     def simple_action(self):
63     """Issue a SIMPLE message without parameters"""
64     self.issue(SIMPLE)
65    
66     def param_action(self):
67     """Issue a PARAM message with 42 as parameter"""
68     self.issue(PARAM, 42)
69    
70    
71     class Receiver:
72    
73     """Class to be used as a generic receiver of messages.
74    
75     An instance of this class has some methods that can be used as
76     subscribers for messages. These messages put information about the
77     messages they received into the public instance variable messages.
78     See the method's doc-strings for more information.
79    
80     Furthermore, the class is instantiated with a test case object as
81     parameter and the instance notifies the test case when it's being
82     instantiated and deleted so that the test case can determine which
83     objects weren't deleted.
84     """
85    
86     def __init__(self, testcase):
87     """Initialize the object for the given testcase.
88    
89     Call the testcase's expect_delete method with self as parameter.
90     """
91     self.testcase = testcase
92     self.testcase.expect_delete(self)
93     self.reset()
94    
95     def __del__(self):
96     """Tell the test case that the object has been deleted"""
97     self.testcase.deleted(self)
98    
99     def reset(self):
100     """Clear the list of received messages"""
101     self.messages = []
102    
103     def no_params(self):
104     """Method for subscriptions without parameters
105    
106     Add the tuple ("no_params",) to self.messages
107     """
108     self.messages.append(("no_params",))
109    
110     def with_params(self, *args):
111     """Method for subscriptions with parameters
112    
113     Add a tuple with the string 'params' followed by the arguments
114     of this function (except for the self parameter) to
115     self.messages.
116     """
117     self.messages.append(("params",) + args)
118    
119    
120    
121     class DeletionTestMixin:
122    
123     """Mixin class to check for memory leaks.
124    
125     Mixin class for test that want to determine whether certain objects
126     have been destroyed.
127    
128     This class maintains two lists, deleted_objects and
129     expected_deletions to determine whether all objects which are
130     expected to be deleted by a test are actually deleted.
131     """
132    
133     def setUp(self):
134     """Initialize self.deleted_objects and self.expected_deletions"""
135     self.deleted_objects = []
136     self.expected_deletions = []
137    
138     def expect_delete(self, obj):
139     """Append the id of obj to the self.expected_deletions"""
140     self.expected_deletions.append(id(obj))
141    
142     def deleted(self, obj):
143     """Append the id of obj to the self.deleted_objects"""
144     self.deleted_objects.append(id(obj))
145    
146 bh 2586 def check_deletions(self):
147 bh 329 """Assert equality of self.expected_deletions and self.deleted_objects
148    
149     This check simply compares the lists for equality and thus
150     effectively assumes that the objects are deleted in the same
151     order in which they're added to the list which if used only for
152     Receiver instances is the order in which they're instantiated.
153     """
154     self.assertEquals(self.expected_deletions, self.deleted_objects)
155    
156    
157     class ConnectorTest(unittest.TestCase, DeletionTestMixin):
158    
159     """Test cases for the Connector class.
160    
161     These tests use the SimplePublisher class instead of the Publisher
162     class in Thuban.Lib.connector because we only want to test the
163     connector here.
164     """
165    
166     def setUp(self):
167     """Extend the inherited method to create a Connector instance.
168    
169     Bind the Connector to self.connector.
170     """
171     self.connector = Connector()
172     DeletionTestMixin.setUp(self)
173    
174     def test_issue_simple(self):
175     """Test connector issue without parameters"""
176     # Make a publisher and a subscriber and connect the two
177     pub = SimplePublisher(self.connector)
178     rec = Receiver(self)
179     self.connector.Connect(pub, SIMPLE, rec.no_params, ())
180    
181     # now the publisher should have subscribers
182     self.assert_(self.connector.HasSubscribers(pub))
183    
184     # Issue a message and check whether the receiver got it
185     pub.issue()
186     self.assertEquals(rec.messages, [("no_params",)])
187     rec.reset()
188    
189     # disconnect and check that the message doesn't get send anymore
190     self.connector.Disconnect(pub, SIMPLE, rec.no_params, ())
191     pub.issue()
192     self.assertEquals(rec.messages, [])
193    
194     # now the publisher should have no subscribers
195     self.failIf(self.connector.HasSubscribers(pub))
196    
197     # make sure that all references have been deleted
198     del rec
199 bh 2586 self.check_deletions()
200 bh 329
201     def test_issue_param(self):
202     """Test connector issue with parameters"""
203     pub = SimplePublisher(self.connector)
204     rec = Receiver(self)
205     # Three cases: 1. The parameter supplied by pub.issue_arg, 2.
206     # only the parameter given when connecting, 3. both
207     self.connector.Connect(pub, PARAM, rec.with_params, ())
208     self.connector.Connect(pub, SIMPLE, rec.with_params, ("deliverator",))
209     self.connector.Connect(pub, PARAM, rec.with_params, ("loglo",))
210    
211     pub.issue_arg()
212     pub.issue()
213     self.assertEquals(rec.messages, [("params", 42),
214     ("params", 42, "loglo"),
215     ("params", "deliverator")])
216    
217     # make sure that all references have been deleted
218     self.connector.RemovePublisher(pub)
219     del rec
220 bh 2586 self.check_deletions()
221 bh 329
222     def test_cyclic_references(self):
223     """Test whether connector avoids cyclic references"""
224     pub = SimplePublisher(self.connector)
225     rec = Receiver(self)
226     self.connector.Connect(pub, SIMPLE, rec.no_params, ())
227    
228     # deleting pub and rec should be enough that the last reference
229     # to rec has been dropped because the connector doesn't keep
230     # references to the publishers and SimplePublisher's __del__
231     # method removes all subscriptions
232     del pub
233     del rec
234 bh 2586 self.check_deletions()
235 bh 329
236 bh 1144 def test_disconnect_in_receiver(self):
237     """Test unsubscribing from a channel while receiving a message
238 bh 329
239 bh 1144 There was a bug in the connector implementation in the following
240     situation:
241    
242     - 2 receivers for the same channel
243    
244     - the reiver called first unsubscribes itself from that channel
245     in response to a message on that channel
246    
247     Now the second receiver is never called because the list of
248     receivers was modified by Disconnect while the connecter was
249     iterating over it.
250     """
251     messages = []
252     def rec1(*args):
253     try:
254     messages.append("rec1")
255     self.connector.Disconnect(None, SIMPLE, rec1, ())
256     except:
257     self.fail("Exception in rec1")
258     def rec2(*args):
259     try:
260     messages.append("rec2")
261     self.connector.Disconnect(None, SIMPLE, rec2, ())
262     except:
263     self.fail("Exception in rec1")
264    
265     self.connector.Connect(None, SIMPLE, rec1, ())
266     self.connector.Connect(None, SIMPLE, rec2, ())
267    
268     self.connector.Issue(None, SIMPLE)
269    
270     self.assertEquals(messages, [("rec1"), ("rec2")])
271    
272    
273 bh 329 class TestPublisher(unittest.TestCase, DeletionTestMixin):
274    
275     """Tests for the Publisher class"""
276    
277     def setUp(self):
278     DeletionTestMixin.setUp(self)
279    
280     def test_issue_simple(self):
281     """Test Publisher message without parameters"""
282     # Make a publisher and a subscriber and connect the two
283     pub = RealPublisher()
284     rec = Receiver(self)
285     pub.Subscribe(SIMPLE, rec.no_params)
286    
287     # Issue a message and check whether the receiver got it
288     pub.simple_action()
289     self.assertEquals(rec.messages, [("no_params",)])
290     rec.reset()
291    
292 bh 1779 # disconnect and check that the message doesn't get sent anymore
293 bh 329 pub.Unsubscribe(SIMPLE, rec.no_params)
294     pub.simple_action()
295     self.assertEquals(rec.messages, [])
296    
297     # make sure that all references have been deleted
298     del rec
299 bh 2586 self.check_deletions()
300 bh 329
301     def test_issue_param(self):
302     """Test Publisher message with parameters"""
303     pub = RealPublisher()
304     rec = Receiver(self)
305     # Three cases: 1. The parameter supplied by pub.issue_arg, 2.
306     # only the parameter given when connecting, 3. both
307     pub.Subscribe(PARAM, rec.with_params)
308     pub.Subscribe(SIMPLE, rec.with_params, "deliverator")
309     pub.Subscribe(PARAM, rec.with_params, "loglo")
310    
311     pub.param_action()
312     pub.simple_action()
313     self.assertEquals(rec.messages, [("params", 42),
314     ("params", 42, "loglo"),
315     ("params", "deliverator")])
316    
317     # make sure that all references have been deleted
318     pub.Destroy()
319     del rec
320 bh 2586 self.check_deletions()
321 bh 329
322     def test_cyclic_references(self):
323     """Test whether Publisher avoids cyclic references"""
324     pub = RealPublisher()
325     rec = Receiver(self)
326     pub.Subscribe(SIMPLE, rec.no_params, ())
327    
328     # deleting pub and rec should be enough that the last reference
329     # to rec has been dropped because the connector doesn't keep
330     # references to the publishers and SimplePublisher's __del__
331     # method removes all subscriptions
332     del pub
333     del rec
334 bh 2586 self.check_deletions()
335 bh 329
336 bh 1953 def test_unsubscribe_after_destroy(self):
337     """Test that Unsubscribe() does not raise exceptions after a Destroy"""
338     pub = RealPublisher()
339     rec = Receiver(self)
340     pub.Subscribe(SIMPLE, rec.no_params)
341 bh 329
342 bh 1953 # Sanity check: Issue a message and check whether the receiver
343     # got it
344     pub.simple_action()
345     self.assertEquals(rec.messages, [("no_params",)])
346     rec.reset()
347    
348     # Now the real test. Destroy the publisher and Unsubscribe the
349     # receiver afterwards. The Unsubscribe should not raise an
350     # exception.
351     pub.Destroy()
352     try:
353     pub.Unsubscribe(SIMPLE, rec.no_params)
354     except ConnectorError:
355     self.fail("Unsubscribe after Destroy raised exception:\n"+
356     "".join(traceback.format_exception(*sys.exc_info())))
357    
358    
359 bh 1779 class MyConduit(Conduit):
360 bh 329
361 bh 1779 """Class for use in the Conduit tests
362    
363     Like publishers Conduits are not instantiated themselves they're
364     always used as base classes.
365     """
366    
367     def __init__(self, forward):
368     self.forward = forward
369     self.subscribe_forwarding(SIMPLE, self.forward)
370    
371     def set_forward(self, forward):
372     # NOTE: The fact the we simply pass self.forward through to
373     # unsubscribe_forwarding and subscribe_forwarding is used by the
374     # test for None handling.
375     self.unsubscribe_forwarding(SIMPLE, self.forward)
376     self.forward = forward
377     self.subscribe_forwarding(SIMPLE, self.forward)
378    
379     def action(self):
380     self.delegate.param_action()
381    
382    
383     class TestConduit(unittest.TestCase, support.SubscriberMixin):
384    
385     """Tests for the Conduit class"""
386    
387     def setUp(self):
388     self.publisher = RealPublisher()
389     self.other_publisher = RealPublisher()
390     self.clear_messages()
391    
392     def tearDown(self):
393     self.clear_messages()
394     self.publisher.Destroy()
395     self.other_publisher.Destroy()
396     self.publisher = self.other_publisher = None
397    
398     def test_forwarding(self):
399     """Test conduit forwarding"""
400     cond = MyConduit(self.publisher)
401     cond.Subscribe(SIMPLE, self.subscribe_with_params, SIMPLE)
402     self.publisher.simple_action()
403     self.check_messages([(SIMPLE,)])
404     self.clear_messages()
405    
406     # Set a different publisher. The message of the new publisher
407     # will be forwarded through the conduit but not that of the old
408     # one anymore
409     cond.set_forward(self.other_publisher)
410     self.other_publisher.simple_action()
411     self.check_messages([(SIMPLE,)])
412     self.clear_messages()
413    
414     self.publisher.simple_action()
415     self.check_messages([])
416     self.clear_messages()
417    
418     def test_none_handling(self):
419     """Test whether (un)subscribe_forwarding work with None"""
420     # All we test is whether it works without raising exceptions :)
421     cond = MyConduit(None)
422     cond.set_forward(None)
423    
424    
425 bh 329 if __name__ == "__main__":
426 bh 1779 support.run_tests()

Properties

Name Value
svn:eol-style native
svn:keywords Author Date Id Revision

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26