Package cherrypy :: Package test :: Module test_session
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_session

  1  import os 
  2  localDir = os.path.dirname(__file__) 
  3  import sys 
  4  import threading 
  5  import time 
  6   
  7  import cherrypy 
  8  from cherrypy._cpcompat import copykeys, HTTPConnection, HTTPSConnection 
  9  from cherrypy.lib import sessions 
 10  from cherrypy.lib.httputil import response_codes 
 11   
 12   
13 -def http_methods_allowed(methods=['GET', 'HEAD']):
14 method = cherrypy.request.method.upper() 15 if method not in methods: 16 cherrypy.response.headers['Allow'] = ", ".join(methods) 17 raise cherrypy.HTTPError(405)
18 19 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed) 20 21
22 -def setup_server():
23 24 class Root: 25 26 _cp_config = {'tools.sessions.on': True, 27 'tools.sessions.storage_type': 'ram', 28 'tools.sessions.storage_path': localDir, 29 'tools.sessions.timeout': (1.0 / 60), 30 'tools.sessions.clean_freq': (1.0 / 60), 31 } 32 33 def clear(self): 34 cherrypy.session.cache.clear()
35 clear.exposed = True 36 37 def data(self): 38 cherrypy.session['aha'] = 'foo' 39 return repr(cherrypy.session._data) 40 data.exposed = True 41 42 def testGen(self): 43 counter = cherrypy.session.get('counter', 0) + 1 44 cherrypy.session['counter'] = counter 45 yield str(counter) 46 testGen.exposed = True 47 48 def testStr(self): 49 counter = cherrypy.session.get('counter', 0) + 1 50 cherrypy.session['counter'] = counter 51 return str(counter) 52 testStr.exposed = True 53 54 def setsessiontype(self, newtype): 55 self.__class__._cp_config.update( 56 {'tools.sessions.storage_type': newtype}) 57 if hasattr(cherrypy, "session"): 58 del cherrypy.session 59 cls = getattr(sessions, newtype.title() + 'Session') 60 if cls.clean_thread: 61 cls.clean_thread.stop() 62 cls.clean_thread.unsubscribe() 63 del cls.clean_thread 64 setsessiontype.exposed = True 65 setsessiontype._cp_config = {'tools.sessions.on': False} 66 67 def index(self): 68 sess = cherrypy.session 69 c = sess.get('counter', 0) + 1 70 time.sleep(0.01) 71 sess['counter'] = c 72 return str(c) 73 index.exposed = True 74 75 def keyin(self, key): 76 return str(key in cherrypy.session) 77 keyin.exposed = True 78 79 def delete(self): 80 cherrypy.session.delete() 81 sessions.expire() 82 return "done" 83 delete.exposed = True 84 85 def delkey(self, key): 86 del cherrypy.session[key] 87 return "OK" 88 delkey.exposed = True 89 90 def blah(self): 91 return self._cp_config['tools.sessions.storage_type'] 92 blah.exposed = True 93 94 def iredir(self): 95 raise cherrypy.InternalRedirect('/blah') 96 iredir.exposed = True 97 98 def restricted(self): 99 return cherrypy.request.method 100 restricted.exposed = True 101 restricted._cp_config = {'tools.allow.on': True, 102 'tools.allow.methods': ['GET']} 103 104 def regen(self): 105 cherrypy.tools.sessions.regenerate() 106 return "logged in" 107 regen.exposed = True 108 109 def length(self): 110 return str(len(cherrypy.session)) 111 length.exposed = True 112 113 def session_cookie(self): 114 # Must load() to start the clean thread. 115 cherrypy.session.load() 116 return cherrypy.session.id 117 session_cookie.exposed = True 118 session_cookie._cp_config = { 119 'tools.sessions.path': '/session_cookie', 120 'tools.sessions.name': 'temp', 121 'tools.sessions.persistent': False} 122 123 cherrypy.tree.mount(Root()) 124 125 126 from cherrypy.test import helper 127 128
129 -class SessionTest(helper.CPWebCase):
130 setup_server = staticmethod(setup_server) 131
132 - def tearDown(self):
133 # Clean up sessions. 134 for fname in os.listdir(localDir): 135 if fname.startswith(sessions.FileSession.SESSION_PREFIX): 136 os.unlink(os.path.join(localDir, fname))
137
138 - def test_0_Session(self):
139 self.getPage('/setsessiontype/ram') 140 self.getPage('/clear') 141 142 # Test that a normal request gets the same id in the cookies. 143 # Note: this wouldn't work if /data didn't load the session. 144 self.getPage('/data') 145 self.assertBody("{'aha': 'foo'}") 146 c = self.cookies[0] 147 self.getPage('/data', self.cookies) 148 self.assertEqual(self.cookies[0], c) 149 150 self.getPage('/testStr') 151 self.assertBody('1') 152 cookie_parts = dict([p.strip().split('=') 153 for p in self.cookies[0][1].split(";")]) 154 # Assert there is an 'expires' param 155 self.assertEqual(set(cookie_parts.keys()), 156 set(['session_id', 'expires', 'Path'])) 157 self.getPage('/testGen', self.cookies) 158 self.assertBody('2') 159 self.getPage('/testStr', self.cookies) 160 self.assertBody('3') 161 self.getPage('/data', self.cookies) 162 self.assertBody("{'aha': 'foo', 'counter': 3}") 163 self.getPage('/length', self.cookies) 164 self.assertBody('2') 165 self.getPage('/delkey?key=counter', self.cookies) 166 self.assertStatus(200) 167 168 self.getPage('/setsessiontype/file') 169 self.getPage('/testStr') 170 self.assertBody('1') 171 self.getPage('/testGen', self.cookies) 172 self.assertBody('2') 173 self.getPage('/testStr', self.cookies) 174 self.assertBody('3') 175 self.getPage('/delkey?key=counter', self.cookies) 176 self.assertStatus(200) 177 178 # Wait for the session.timeout (1 second) 179 time.sleep(2) 180 self.getPage('/') 181 self.assertBody('1') 182 self.getPage('/length', self.cookies) 183 self.assertBody('1') 184 185 # Test session __contains__ 186 self.getPage('/keyin?key=counter', self.cookies) 187 self.assertBody("True") 188 cookieset1 = self.cookies 189 190 # Make a new session and test __len__ again 191 self.getPage('/') 192 self.getPage('/length', self.cookies) 193 self.assertBody('2') 194 195 # Test session delete 196 self.getPage('/delete', self.cookies) 197 self.assertBody("done") 198 self.getPage('/delete', cookieset1) 199 self.assertBody("done") 200 f = lambda: [ 201 x for x in os.listdir(localDir) if x.startswith('session-')] 202 self.assertEqual(f(), []) 203 204 # Wait for the cleanup thread to delete remaining session files 205 self.getPage('/') 206 f = lambda: [ 207 x for x in os.listdir(localDir) if x.startswith('session-')] 208 self.assertNotEqual(f(), []) 209 time.sleep(2) 210 self.assertEqual(f(), [])
211
212 - def test_1_Ram_Concurrency(self):
213 self.getPage('/setsessiontype/ram') 214 self._test_Concurrency()
215
216 - def test_2_File_Concurrency(self):
217 self.getPage('/setsessiontype/file') 218 self._test_Concurrency()
219
220 - def _test_Concurrency(self):
221 client_thread_count = 5 222 request_count = 30 223 224 # Get initial cookie 225 self.getPage("/") 226 self.assertBody("1") 227 cookies = self.cookies 228 229 data_dict = {} 230 errors = [] 231 232 def request(index): 233 if self.scheme == 'https': 234 c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 235 else: 236 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 237 for i in range(request_count): 238 c.putrequest('GET', '/') 239 for k, v in cookies: 240 c.putheader(k, v) 241 c.endheaders() 242 response = c.getresponse() 243 body = response.read() 244 if response.status != 200 or not body.isdigit(): 245 errors.append((response.status, body)) 246 else: 247 data_dict[index] = max(data_dict[index], int(body))
248 # Uncomment the following line to prove threads overlap. 249 ## sys.stdout.write("%d " % index) 250 251 # Start <request_count> requests from each of 252 # <client_thread_count> concurrent clients 253 ts = [] 254 for c in range(client_thread_count): 255 data_dict[c] = 0 256 t = threading.Thread(target=request, args=(c,)) 257 ts.append(t) 258 t.start() 259 260 for t in ts: 261 t.join() 262 263 hitcount = max(data_dict.values()) 264 expected = 1 + (client_thread_count * request_count) 265 266 for e in errors: 267 print(e) 268 self.assertEqual(hitcount, expected)
269
270 - def test_3_Redirect(self):
271 # Start a new session 272 self.getPage('/testStr') 273 self.getPage('/iredir', self.cookies) 274 self.assertBody("file")
275
276 - def test_4_File_deletion(self):
277 # Start a new session 278 self.getPage('/testStr') 279 # Delete the session file manually and retry. 280 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 281 path = os.path.join(localDir, "session-" + id) 282 os.unlink(path) 283 self.getPage('/testStr', self.cookies)
284
285 - def test_5_Error_paths(self):
286 self.getPage('/unknown/page') 287 self.assertErrorPage(404, "The path '/unknown/page' was not found.") 288 289 # Note: this path is *not* the same as above. The above 290 # takes a normal route through the session code; this one 291 # skips the session code's before_handler and only calls 292 # before_finalize (save) and on_end (close). So the session 293 # code has to survive calling save/close without init. 294 self.getPage('/restricted', self.cookies, method='POST') 295 self.assertErrorPage(405, response_codes[405][1])
296
297 - def test_6_regenerate(self):
298 self.getPage('/testStr') 299 # grab the cookie ID 300 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 301 self.getPage('/regen') 302 self.assertBody('logged in') 303 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 304 self.assertNotEqual(id1, id2) 305 306 self.getPage('/testStr') 307 # grab the cookie ID 308 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 309 self.getPage('/testStr', 310 headers=[ 311 ('Cookie', 312 'session_id=maliciousid; ' 313 'expires=Sat, 27 Oct 2017 04:18:28 GMT; Path=/;')]) 314 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 315 self.assertNotEqual(id1, id2) 316 self.assertNotEqual(id2, 'maliciousid')
317
318 - def test_7_session_cookies(self):
319 self.getPage('/setsessiontype/ram') 320 self.getPage('/clear') 321 self.getPage('/session_cookie') 322 # grab the cookie ID 323 cookie_parts = dict([p.strip().split('=') 324 for p in self.cookies[0][1].split(";")]) 325 # Assert there is no 'expires' param 326 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 327 id1 = cookie_parts['temp'] 328 self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) 329 330 # Send another request in the same "browser session". 331 self.getPage('/session_cookie', self.cookies) 332 cookie_parts = dict([p.strip().split('=') 333 for p in self.cookies[0][1].split(";")]) 334 # Assert there is no 'expires' param 335 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 336 self.assertBody(id1) 337 self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) 338 339 # Simulate a browser close by just not sending the cookies 340 self.getPage('/session_cookie') 341 # grab the cookie ID 342 cookie_parts = dict([p.strip().split('=') 343 for p in self.cookies[0][1].split(";")]) 344 # Assert there is no 'expires' param 345 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 346 # Assert a new id has been generated... 347 id2 = cookie_parts['temp'] 348 self.assertNotEqual(id1, id2) 349 self.assertEqual(set(sessions.RamSession.cache.keys()), 350 set([id1, id2])) 351 352 # Wait for the session.timeout on both sessions 353 time.sleep(2.5) 354 cache = copykeys(sessions.RamSession.cache) 355 if cache: 356 if cache == [id2]: 357 self.fail("The second session did not time out.") 358 else: 359 self.fail("Unknown session id in cache: %r", cache)
360 361 362 import socket 363 try: 364 import memcache 365 366 host, port = '127.0.0.1', 11211 367 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, 368 socket.SOCK_STREAM): 369 af, socktype, proto, canonname, sa = res 370 s = None 371 try: 372 s = socket.socket(af, socktype, proto) 373 # See http://groups.google.com/group/cherrypy-users/ 374 # browse_frm/thread/bbfe5eb39c904fe0 375 s.settimeout(1.0) 376 s.connect((host, port)) 377 s.close() 378 except socket.error: 379 if s: 380 s.close() 381 raise 382 break 383 except (ImportError, socket.error):
384 - class MemcachedSessionTest(helper.CPWebCase):
385 setup_server = staticmethod(setup_server) 386
387 - def test(self):
388 return self.skip("memcached not reachable ")
389 else:
390 - class MemcachedSessionTest(helper.CPWebCase):
391 setup_server = staticmethod(setup_server) 392
393 - def test_0_Session(self):
394 self.getPage('/setsessiontype/memcached') 395 396 self.getPage('/testStr') 397 self.assertBody('1') 398 self.getPage('/testGen', self.cookies) 399 self.assertBody('2') 400 self.getPage('/testStr', self.cookies) 401 self.assertBody('3') 402 self.getPage('/length', self.cookies) 403 self.assertErrorPage(500) 404 self.assertInBody("NotImplementedError") 405 self.getPage('/delkey?key=counter', self.cookies) 406 self.assertStatus(200) 407 408 # Wait for the session.timeout (1 second) 409 time.sleep(1.25) 410 self.getPage('/') 411 self.assertBody('1') 412 413 # Test session __contains__ 414 self.getPage('/keyin?key=counter', self.cookies) 415 self.assertBody("True") 416 417 # Test session delete 418 self.getPage('/delete', self.cookies) 419 self.assertBody("done")
420
421 - def test_1_Concurrency(self):
422 client_thread_count = 5 423 request_count = 30 424 425 # Get initial cookie 426 self.getPage("/") 427 self.assertBody("1") 428 cookies = self.cookies 429 430 data_dict = {} 431 432 def request(index): 433 for i in range(request_count): 434 self.getPage("/", cookies) 435 # Uncomment the following line to prove threads overlap. 436 ## sys.stdout.write("%d " % index) 437 if not self.body.isdigit(): 438 self.fail(self.body) 439 data_dict[index] = v = int(self.body)
440 441 # Start <request_count> concurrent requests from 442 # each of <client_thread_count> clients 443 ts = [] 444 for c in range(client_thread_count): 445 data_dict[c] = 0 446 t = threading.Thread(target=request, args=(c,)) 447 ts.append(t) 448 t.start() 449 450 for t in ts: 451 t.join() 452 453 hitcount = max(data_dict.values()) 454 expected = 1 + (client_thread_count * request_count) 455 self.assertEqual(hitcount, expected)
456
457 - def test_3_Redirect(self):
458 # Start a new session 459 self.getPage('/testStr') 460 self.getPage('/iredir', self.cookies) 461 self.assertBody("memcached")
462
463 - def test_5_Error_paths(self):
464 self.getPage('/unknown/page') 465 self.assertErrorPage( 466 404, "The path '/unknown/page' was not found.") 467 468 # Note: this path is *not* the same as above. The above 469 # takes a normal route through the session code; this one 470 # skips the session code's before_handler and only calls 471 # before_finalize (save) and on_end (close). So the session 472 # code has to survive calling save/close without init. 473 self.getPage('/restricted', self.cookies, method='POST') 474 self.assertErrorPage(405, response_codes[405][1])
475