/usr/share/pyshared/acct_mgr/db.py is in trac-accountmanager 0.4.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | # -*- coding: utf-8 -*-
#
# Copyright (C) 2007 Matthew Good <trac@matt-good.net>
# Copyright (C) 2010-2012 Steffen Hoffmann <hoff.st@web.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#
# Author: Matthew Good <trac@matt-good.net>
from trac.core import Component, implements
from trac.config import ExtensionOption
from acct_mgr.api import IPasswordStore, _, N_
from acct_mgr.pwhash import IPasswordHashMethod
class SessionStore(Component):
implements(IPasswordStore)
hash_method = ExtensionOption('account-manager', 'hash_method',
IPasswordHashMethod, 'HtDigestHashMethod',
doc = N_("IPasswordHashMethod used to create new/updated passwords"))
def __init__(self):
self.key = 'password'
# Check for valid hash method configuration.
self.hash_method_enabled
def get_users(self):
"""Returns an iterable of the known usernames."""
db = self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute("""
SELECT DISTINCT sid
FROM session_attribute
WHERE authenticated=1
AND name=%s
""", (self.key,))
for sid, in cursor:
yield sid
def has_user(self, user):
db = self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute("""
SELECT *
FROM session_attribute
WHERE authenticated=1
AND name=%s
AND sid=%s
""", (self.key, user))
for row in cursor:
return True
return False
def set_password(self, user, password, old_password=None):
"""Sets the password for the user.
This should create the user account, if it doesn't already exist.
Returns True, if a new account was created, and False,
if an existing account was updated.
"""
if not self.hash_method_enabled:
return
hash = self.hash_method.generate_hash(user, password)
db = self.env.get_db_cnx()
cursor = db.cursor()
sql = """
WHERE authenticated=1
AND name=%s
AND sid=%s
"""
cursor.execute("""
UPDATE session_attribute
SET value=%s
""" + sql, (hash, self.key, user))
cursor.execute("""
SELECT value
FROM session_attribute
""" + sql, (self.key, user))
not_exists = cursor.fetchone() is None
if not_exists:
cursor.execute("""
INSERT INTO session_attribute
(sid,authenticated,name,value)
VALUES (%s,1,%s,%s)
""", (user, self.key, hash))
db.commit()
return not_exists
def check_password(self, user, password):
"""Checks if the password is valid for the user."""
if not self.hash_method_enabled:
return
db = self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute("""
SELECT value
FROM session_attribute
WHERE authenticated=1
AND name=%s
AND sid=%s
""", (self.key, user))
for hash, in cursor:
return self.hash_method.check_hash(user, password, hash)
# Return value 'None' allows to proceed with another, chained store.
return
def delete_user(self, user):
"""Deletes the user account.
Returns True, if the account existed and was deleted, False otherwise.
"""
db = self.env.get_db_cnx()
cursor = db.cursor()
sql = """
WHERE authenticated=1
AND name=%s
AND sid=%s
"""
# Avoid has_user() to make this transaction atomic.
cursor.execute("""
SELECT *
FROM session_attribute
""" + sql, (self.key, user))
exists = cursor.fetchone() is not None
if exists:
cursor.execute("""
DELETE
FROM session_attribute
""" + sql, (self.key, user))
db.commit()
return exists
@property
def hash_method_enabled(self):
try:
hash_method = self.hash_method
except AttributeError:
self.env.log.error("%s: no IPasswordHashMethod enabled "
"- fatal, can't work" % self.__class__)
return
return True
|