Package cherrypy :: Package test :: Module test_bus
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_bus

  1  import threading 
  2  import time 
  3  import unittest 
  4   
  5  import cherrypy 
  6  from cherrypy._cpcompat import get_daemon, set 
  7  from cherrypy.process import wspbus 
  8   
  9   
 10  msg = "Listener %d on channel %s: %s." 
 11   
 12   
13 -class PublishSubscribeTests(unittest.TestCase):
14
15 - def get_listener(self, channel, index):
16 def listener(arg=None): 17 self.responses.append(msg % (index, channel, arg))
18 return listener
19
20 - def test_builtin_channels(self):
21 b = wspbus.Bus() 22 23 self.responses, expected = [], [] 24 25 for channel in b.listeners: 26 for index, priority in enumerate([100, 50, 0, 51]): 27 b.subscribe(channel, 28 self.get_listener(channel, index), priority) 29 30 for channel in b.listeners: 31 b.publish(channel) 32 expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)]) 33 b.publish(channel, arg=79347) 34 expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)]) 35 36 self.assertEqual(self.responses, expected)
37
38 - def test_custom_channels(self):
39 b = wspbus.Bus() 40 41 self.responses, expected = [], [] 42 43 custom_listeners = ('hugh', 'louis', 'dewey') 44 for channel in custom_listeners: 45 for index, priority in enumerate([None, 10, 60, 40]): 46 b.subscribe(channel, 47 self.get_listener(channel, index), priority) 48 49 for channel in custom_listeners: 50 b.publish(channel, 'ah so') 51 expected.extend([msg % (i, channel, 'ah so') 52 for i in (1, 3, 0, 2)]) 53 b.publish(channel) 54 expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)]) 55 56 self.assertEqual(self.responses, expected)
57
58 - def test_listener_errors(self):
59 b = wspbus.Bus() 60 61 self.responses, expected = [], [] 62 channels = [c for c in b.listeners if c != 'log'] 63 64 for channel in channels: 65 b.subscribe(channel, self.get_listener(channel, 1)) 66 # This will break since the lambda takes no args. 67 b.subscribe(channel, lambda: None, priority=20) 68 69 for channel in channels: 70 self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123) 71 expected.append(msg % (1, channel, 123)) 72 73 self.assertEqual(self.responses, expected)
74 75
76 -class BusMethodTests(unittest.TestCase):
77
78 - def log(self, bus):
79 self._log_entries = [] 80 81 def logit(msg, level): 82 self._log_entries.append(msg)
83 bus.subscribe('log', logit)
84
85 - def assertLog(self, entries):
86 self.assertEqual(self._log_entries, entries)
87
88 - def get_listener(self, channel, index):
89 def listener(arg=None): 90 self.responses.append(msg % (index, channel, arg))
91 return listener 92
93 - def test_start(self):
94 b = wspbus.Bus() 95 self.log(b) 96 97 self.responses = [] 98 num = 3 99 for index in range(num): 100 b.subscribe('start', self.get_listener('start', index)) 101 102 b.start() 103 try: 104 # The start method MUST call all 'start' listeners. 105 self.assertEqual( 106 set(self.responses), 107 set([msg % (i, 'start', None) for i in range(num)])) 108 # The start method MUST move the state to STARTED 109 # (or EXITING, if errors occur) 110 self.assertEqual(b.state, b.states.STARTED) 111 # The start method MUST log its states. 112 self.assertLog(['Bus STARTING', 'Bus STARTED']) 113 finally: 114 # Exit so the atexit handler doesn't complain. 115 b.exit()
116
117 - def test_stop(self):
118 b = wspbus.Bus() 119 self.log(b) 120 121 self.responses = [] 122 num = 3 123 for index in range(num): 124 b.subscribe('stop', self.get_listener('stop', index)) 125 126 b.stop() 127 128 # The stop method MUST call all 'stop' listeners. 129 self.assertEqual(set(self.responses), 130 set([msg % (i, 'stop', None) for i in range(num)])) 131 # The stop method MUST move the state to STOPPED 132 self.assertEqual(b.state, b.states.STOPPED) 133 # The stop method MUST log its states. 134 self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
135
136 - def test_graceful(self):
137 b = wspbus.Bus() 138 self.log(b) 139 140 self.responses = [] 141 num = 3 142 for index in range(num): 143 b.subscribe('graceful', self.get_listener('graceful', index)) 144 145 b.graceful() 146 147 # The graceful method MUST call all 'graceful' listeners. 148 self.assertEqual( 149 set(self.responses), 150 set([msg % (i, 'graceful', None) for i in range(num)])) 151 # The graceful method MUST log its states. 152 self.assertLog(['Bus graceful'])
153
154 - def test_exit(self):
155 b = wspbus.Bus() 156 self.log(b) 157 158 self.responses = [] 159 num = 3 160 for index in range(num): 161 b.subscribe('stop', self.get_listener('stop', index)) 162 b.subscribe('exit', self.get_listener('exit', index)) 163 164 b.exit() 165 166 # The exit method MUST call all 'stop' listeners, 167 # and then all 'exit' listeners. 168 self.assertEqual(set(self.responses), 169 set([msg % (i, 'stop', None) for i in range(num)] + 170 [msg % (i, 'exit', None) for i in range(num)])) 171 # The exit method MUST move the state to EXITING 172 self.assertEqual(b.state, b.states.EXITING) 173 # The exit method MUST log its states. 174 self.assertLog( 175 ['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
176
177 - def test_wait(self):
178 b = wspbus.Bus() 179 180 def f(method): 181 time.sleep(0.2) 182 getattr(b, method)()
183 184 for method, states in [('start', [b.states.STARTED]), 185 ('stop', [b.states.STOPPED]), 186 ('start', 187 [b.states.STARTING, b.states.STARTED]), 188 ('exit', [b.states.EXITING]), 189 ]: 190 threading.Thread(target=f, args=(method,)).start() 191 b.wait(states) 192 193 # The wait method MUST wait for the given state(s). 194 if b.state not in states: 195 self.fail("State %r not in %r" % (b.state, states)) 196
197 - def test_block(self):
198 b = wspbus.Bus() 199 self.log(b) 200 201 def f(): 202 time.sleep(0.2) 203 b.exit()
204 205 def g(): 206 time.sleep(0.4) 207 threading.Thread(target=f).start() 208 threading.Thread(target=g).start() 209 threads = [t for t in threading.enumerate() if not get_daemon(t)] 210 self.assertEqual(len(threads), 3) 211 212 b.block() 213 214 # The block method MUST wait for the EXITING state. 215 self.assertEqual(b.state, b.states.EXITING) 216 # The block method MUST wait for ALL non-main, non-daemon threads to 217 # finish. 218 threads = [t for t in threading.enumerate() if not get_daemon(t)] 219 self.assertEqual(len(threads), 1) 220 # The last message will mention an indeterminable thread name; ignore 221 # it 222 self.assertEqual(self._log_entries[:-1], 223 ['Bus STOPPING', 'Bus STOPPED', 224 'Bus EXITING', 'Bus EXITED', 225 'Waiting for child threads to terminate...']) 226
227 - def test_start_with_callback(self):
228 b = wspbus.Bus() 229 self.log(b) 230 try: 231 events = [] 232 233 def f(*args, **kwargs): 234 events.append(("f", args, kwargs))
235 236 def g(): 237 events.append("g") 238 b.subscribe("start", g) 239 b.start_with_callback(f, (1, 3, 5), {"foo": "bar"}) 240 # Give wait() time to run f() 241 time.sleep(0.2) 242 243 # The callback method MUST wait for the STARTED state. 244 self.assertEqual(b.state, b.states.STARTED) 245 # The callback method MUST run after all start methods. 246 self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})]) 247 finally: 248 b.exit() 249
250 - def test_log(self):
251 b = wspbus.Bus() 252 self.log(b) 253 self.assertLog([]) 254 255 # Try a normal message. 256 expected = [] 257 for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]: 258 b.log(msg) 259 expected.append(msg) 260 self.assertLog(expected) 261 262 # Try an error message 263 try: 264 foo 265 except NameError: 266 b.log("You are lost and gone forever", traceback=True) 267 lastmsg = self._log_entries[-1] 268 if "Traceback" not in lastmsg or "NameError" not in lastmsg: 269 self.fail("Last log message %r did not contain " 270 "the expected traceback." % lastmsg) 271 else: 272 self.fail("NameError was not raised as expected.")
273 274 275 if __name__ == "__main__": 276 unittest.main() 277