/usr/share/pyshared/albatross/sessionfile.py is in python-albatross 1.36-5.5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | #
# Copyright 2001 by Object Craft P/L, Melbourne, Australia.
#
# LICENCE - see LICENCE file distributed with this software for details.
#
# Note: session_dir should not be a publically readable directory, or
# local users will be able to steal sessions.
import os
import stat
import sys
import binascii
import random
import errno
import struct
import os.path
from albatross.app import AppContext, Application, PageObjectMixin,\
PageModuleMixin
from albatross.context import SessionBase, NameRecorderMixin,\
PickleSignMixin, CachingTemplateLoaderMixin
from albatross.randompage import RandomPageModuleMixin
from albatross.session import SessionCookieMixin
from albatross.common import *
# File based session recording
class SessionFileContextMixin(SessionCookieMixin, SessionBase):
def __init__(self):
SessionBase.__init__(self)
self.__sesid = None
def sesid(self):
return self.__sesid
def load_session(self):
sesid = self._get_sesid_from_cookie()
text = None
if sesid:
sesid = os.path.basename(sesid) # Safety
text = self.app.get_session(sesid)
if not text:
if self.request.get_method().upper() != 'GET':
raise SessionExpired('Session expired or browser does not support cookies')
sesid = self.app.new_session()
self.__sesid = sesid
if text:
try:
self.decode_session(text)
except:
self.app.del_session(sesid)
raise
self._set_sesid_cookie(self.__sesid)
def save_session(self):
if self.should_save_session() and self.__sesid is not None:
text = self.encode_session()
self.app.put_session(self.__sesid, text)
def remove_session(self):
SessionBase.remove_session(self)
if self.__sesid is not None:
self.app.del_session(self.__sesid)
sesid = self.app.new_session()
self.__sesid = sesid
self._set_sesid_cookie(self.__sesid)
class SessionFileAppMixin:
def __init__(self, session_appid, session_dir, session_age = 1800):
self.__appid = session_appid
self.__session_dir = session_dir
self.__age = session_age
def ses_appid(self):
return self.__appid
def ses_age(self):
return self.__age
def get_session(self, sesid):
text = None
try:
text = open(self._ses_filename(sesid), "rb").read()
except IOError, (eno, estr):
if eno != errno.ENOENT:
self.del_session(sesid)
raise ServerError("read session file: %s: %s" %
(self._ses_filename(sesid), estr))
return text
def new_session(self):
while 1:
try:
text = open('/dev/urandom').read(8)
except (OSError, IOError):
text = struct.pack('d', random.random())
sesid = binascii.hexlify(text)
try:
fd = os.open(self._ses_filename(sesid),
os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0700)
except OSError, (eno, estr):
if eno == errno.ENOENT:
try:
os.makedirs(self.__session_dir, 0700)
except OSError, (eno, estr):
raise ServerError("create session dir: %s: %s" %
(self.__session_dir, estr))
continue
if eno != errno.EEXIST:
raise ServerError("new_session: %s: %s" %
(self._ses_filename(sesid), estr))
continue
os.close(fd)
return sesid
def put_session(self, sesid, text):
try:
file = open(self._ses_filename(sesid), "wb")
file.write(text)
file.close()
except IOError, (eno, estr):
raise ServerError("write session file: %s: %s" %
(self._ses_filename(sesid), estr))
def del_session(self, sesid):
try:
os.remove(self._ses_filename(sesid))
except OSError:
pass
def _ses_filename(self, sesid):
if os.path.dirname(sesid) != '':
raise SecurityError("bad session id: %s" % (sesid))
return os.path.join(self.__session_dir, sesid + '.als')
class SessionFileAppContext(AppContext,
NameRecorderMixin,
SessionFileContextMixin):
def __init__(self, app):
AppContext.__init__(self, app)
NameRecorderMixin.__init__(self)
SessionFileContextMixin.__init__(self)
class SimpleSessionFileApp(PickleSignMixin,
Application,
CachingTemplateLoaderMixin,
PageObjectMixin,
SessionFileAppMixin):
def __init__(self, base_url, template_path, start_page, secret,
session_appid, session_dir):
Application.__init__(self, base_url)
PickleSignMixin.__init__(self, secret)
CachingTemplateLoaderMixin.__init__(self, template_path)
PageObjectMixin.__init__(self, start_page)
SessionFileAppMixin.__init__(self, session_appid, session_dir)
def create_context(self):
return SessionFileAppContext(self)
class ModularSessionFileApp(PickleSignMixin,
Application,
CachingTemplateLoaderMixin,
PageModuleMixin,
SessionFileAppMixin):
def __init__(self, base_url, module_path, template_path, start_page, secret,
session_appid, session_dir):
Application.__init__(self, base_url)
PickleSignMixin.__init__(self, secret)
CachingTemplateLoaderMixin.__init__(self, template_path)
PageModuleMixin.__init__(self, module_path, start_page)
SessionFileAppMixin.__init__(self, session_appid, session_dir)
def create_context(self):
return SessionFileAppContext(self)
class RandomModularSessionFileApp(PickleSignMixin,
Application,
CachingTemplateLoaderMixin,
RandomPageModuleMixin,
SessionFileAppMixin):
def __init__(self, base_url, page_path, start_page, secret,
session_appid, session_dir):
Application.__init__(self, base_url)
PickleSignMixin.__init__(self, secret)
CachingTemplateLoaderMixin.__init__(self, page_path)
RandomPageModuleMixin.__init__(self, page_path, start_page)
SessionFileAppMixin.__init__(self, session_appid, session_dir)
def create_context(self):
return SessionFileAppContext(self)
|