/usr/lib/python3/dist-packages/SRS/Base.py is in python3-srs 1.0.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 | # $Log$
# Revision 1.4 2011/03/03 23:46:49 customdesigned
# Release 1.0
#
# Revision 1.3 2008/02/13 18:20:18 customdesigned
# Handle quoted localpart.
#
# Revision 1.2 2006/02/16 05:16:59 customdesigned
# Support SRS signing mode.
#
# Revision 1.1.1.2 2005/06/03 04:13:55 customdesigned
# Support sendmail socketmap
#
# Revision 1.3 2004/06/09 00:29:25 stuart
# Use hmac instead of straight sha
#
# Revision 1.2 2004/03/22 18:20:19 stuart
# Missing import
#
# Revision 1.1.1.1 2004/03/19 05:23:13 stuart
# Import to CVS
#
#
# AUTHOR
# Shevek
# CPAN ID: SHEVEK
# cpan@anarres.org
# http://www.anarres.org/projects/
#
# Translated to Python by stuart@bmsi.com
# http://bmsi.com/python/milter.html
#
# Portions Copyright (c) 2004 Shevek. All rights reserved.
# Portions Copyright (c) 2004 Business Management Systems. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Python itself.
from __future__ import print_function
import time
import hmac
try: from hashlib import sha1 as sha
except: import sha
import base64
import re
import SRS
import sys
BASE26 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
BASE32 = BASE26 + '234567'
BASE64 = BASE26 + BASE26.lower() + '0123456789+/'
# We have two options. We can either encode an send date or an expiry
# date. If we encode a send date, we have the option of changing
# the expiry date later. If we encode an expiry date, we can send
# different expiry dates for different sources/targets, and we don't
# have to store them.
# Do NOT use BASE64 since the timestamp_check routine now explicit
# smashes case in the timestamp just in case there was a problem.
BASE = BASE32
# This checks for more than one bit set in the size.
# i.e. is the size a power of 2?
base = len(BASE)
if base & (base - 1):
raise ValueError("Invalid base array of size %d" % base)
PRECISION = 60 * 60 * 24 # One day
TICKSLOTS = base * base # Two chars
def parse_addr(sender):
quotes = ''
try:
pos = sender.rindex('@')
senduser = sender[:pos]
sendhost = sender[pos+1:]
if senduser.startswith('"') and senduser.endswith('"'):
senduser = senduser[1:-1]
quotes = '"'
except ValueError:
raise ValueError("Sender '%s' must contain exactly one @" % sender)
return quotes,senduser,sendhost
class Base(object):
def __init__(self,secret=None,maxage=SRS.SRSMAXAGE,
hashlength=SRS.SRSHASHLENGTH,
hashmin=None,separator='=',alwaysrewrite=False,ignoretimestamp=False,
allowunsafesrs=False):
if type(secret) == str:
self.secret = (secret,)
else:
self.secret = secret
self.maxage = maxage
self.hashlength =hashlength
if hashmin: self.hashmin = hashmin
else: self.hashmin = hashlength
self.separator = separator
if not separator in ('-','+','='):
raise ValueError('separator must be = - or +, not %s' % separator)
self.alwaysrewrite = alwaysrewrite
self.ignoretimestamp = ignoretimestamp
self.allowunsafesrs = allowunsafesrs
self.srs0re = re.compile(r'^%s[-+=]' % SRS.SRS0TAG,re.IGNORECASE)
self.srs1re = re.compile(r'^%s[-+=]' % SRS.SRS1TAG,re.IGNORECASE)
#self.ses0re = re.compile(r'^%s[-+=]' % SRS.SES0TAG,re.IGNORECASE)
def warn(self,*msg):
print('WARNING: ',' '.join(msg), file=sys.stderr)
def sign(self,sender):
"""srsaddress = srs.sign(sender)
Map a sender address into the same sender and a cryptographic cookie.
Returns an SRS address to use for preventing bounce abuse.
There are alternative subclasses, some of which will return SRS
compliant addresses, some will simply return non-SRS but valid RFC821
addresses. """
quotes,senduser,sendhost = parse_addr(sender)
# Subclasses may override the compile() method.
srsdata = self.compile(sendhost,senduser,srshost=sendhost)
return '%s%s%s@%s' % (quotes,srsdata,quotes,sendhost)
def forward(self,sender,alias,sign=False):
"""srsaddress = srs.forward(sender, alias)
Map a sender address into a new sender and a cryptographic cookie.
Returns an SRS address to use as the new sender.
There are alternative subclasses, some of which will return SRS
compliant addresses, some will simply return non-SRS but valid RFC821
addresses. """
quotes,senduser,sendhost = parse_addr(sender)
# We don't require alias to be a full address, just a domain will do
aliashost = alias.split('@')[-1]
if aliashost.lower() == sendhost.lower() and not self.alwaysrewrite:
return '%s%s%s@%s' % (quotes,senduser,quotes,sendhost)
# Subclasses may override the compile() method.
if sign:
srsdata = self.compile(sendhost,senduser,srshost=aliashost)
else:
srsdata = self.compile(sendhost,senduser)
return '%s%s%s@%s' % (quotes,srsdata,quotes,aliashost)
def reverse(self,address):
"""sender = srs->reverse(srsaddress)
Reverse the mapping to get back the original address. Validates all
cryptographic and timestamp information. Returns the original sender
address. This method will die if the address cannot be reversed."""
quotes,user,host = parse_addr(address)
sendhost,senduser = self.parse(user,srshost=host)
return '%s%s%s@%s' % (quotes,senduser,quotes,sendhost)
def compile(self,sendhost,senduser):
"""srsdata = srs.compile(host,user)
This method, designed to be overridden by subclasses, takes as
parameters the original host and user and must compile a new username
for the SRS transformed address. It is expected that this new username
will be joined on SRS.SRSSEP, and will contain a hash generated from
self.hash_create(...), and possibly a timestamp generated by
self.timestamp_create()."""
raise NotImplementedError()
def parse(self,srsuser):
"""host,user = srs.parse(srsuser)
This method, designed to be overridden by subclasses, takes an
SRS-transformed username as an argument, and must reverse the
transformation produced by compile(). It is required to verify any
hash and timestamp in the parsed data, using self.hash_verify(hash,
...) and self->timestamp_check(timestamp)."""
raise NotImplementedError()
def timestamp_create(self,ts=None):
"""timestamp = srs.timestamp_create(time)
Return a two character timestamp representing 'today', or time if
given. time is a Unix timestamp (seconds since the aeon).
This Python function has been designed to be agnostic as to base,
and in practice, base32 is used since it can be reversed even if a
remote MTA smashes case (in violation of RFC2821 section 2.4). The
agnosticism means that the Python uses division instead of rightshift,
but in Python that doesn't matter. C implementors should implement this
operation as a right shift by 5."""
if not ts:
ts = time.time()
# Since we only mask in the bottom few bits anyway, we
# don't need to take this modulo anything (e.g. @BASE^2).
ts = int(ts // PRECISION)
# print "Time is $time\n";
mask = base - 1
out = BASE[ts & mask]
ts //= base # Use right shift.
return BASE[ts & mask]+out
def timestamp_check(self,timestamp):
"""srs.timestamp_check(timestamp)
Return True if a timestamp is valid, False otherwise. There are 4096
possible timestamps, used in a cycle. At any time, $srs->{MaxAge}
timestamps in this cycle are valid, the last one being today. A
timestamp from the future is not valid, neither is a timestamp from
too far into the past. Of course if you go far enough into the future,
the cycle wraps around, and there are valid timestamps again, but the
likelihood of a random timestamp being valid is 4096/$srs->{MaxAge},
which is usually quite small: 1 in 132 by default."""
if self.ignoretimestamp: return True
ts = 0
for d in timestamp.upper(): # LOOK OUT - USE BASE32
ts = ts * base + BASE.find(d)
now = (time.time() // PRECISION) % TICKSLOTS
# print "Time is %d, Now is %d" % (ts,now)
while now < ts: now += TICKSLOTS
if now <= ts + self.maxage: return True
return False
def time_check(self,ts):
"""srs.time_check(time)
Similar to srs.timestamp_check(timestamp), but takes a Unix time, and
checks that an alias created at that Unix time is still valid. This is
designed for use by subclasses with storage backends."""
return time.time() <= (ts + (self.maxage * PRECISION))
def hash_create(self,*data):
"""srs.hash_create(data,...)
Returns a cryptographic hash of all data in data. Any piece of data
encoded into an address which must remain inviolate should be hashed,
so that when the address is reversed, we can check that this data has
not been tampered with. You must provide at least one piece of data
to this method (otherwise this system is both cryptographically weak
and there may be collision problems with sender addresses)."""
secret = self.get_secret()
assert secret, "Cannot create a cryptographic MAC without a secret"
h = hmac.new(secret[0].encode(),b'',sha)
for i in data:
h.update(i.lower())
hash = base64.encodestring(h.digest())
return hash[:self.hashlength]
def hash_verify(self,hash,*data):
"""srs.hash_verify(hash,data,...)
Verify that data has not been tampered with, given the cryptographic
hash previously output by srs->hash_create(). Returns True or False.
All known secrets are tried in order to see if the hash was created
with an old secret."""
if len(hash) < self.hashmin: return False
secret = self.get_secret()
assert secret, "Cannot create a cryptographic MAC without a secret"
hashes = []
for s in secret:
h = hmac.new(s.encode(),b'',sha)
for i in data:
h.update(i.lower())
valid = base64.encodestring(h.digest())[:len(hash)]
# We test all case sensitive matches before case insensitive
# matches. While the risk of a case insensitive collision is
# quite low, we might as well be careful.
if valid == hash: return True
hashes.append(valid) # lowercase it later
hash = hash.lower()
for h in hashes:
if hash == h.lower():
self.warn("""SRS: Case insensitive hash match detected.
Someone smashed case in the local-part.""")
return True
return False;
def set_secret(self,*args):
"""srs.set_secret(new,old,...)
Add a new secret to the rewriter. When an address is returned, all
secrets are tried to see if the hash can be validated. Don't use "foo",
"secret", "password", "10downing", "god" or "wednesday" as your secret."""
self.secret = args
def get_secret(self):
"Return the list of secrets. These are secret. Don't publish them."
return self.secret
def separator(self):
"""srs.separator()
Return the initial separator, which follows the SRS tag. This is only
used as the initial separator, for the convenience of administrators
who wish to make srs0 and srs1 users on their mail servers and require
to use + or - as the user delimiter. All other separators in the SRS
address must be C<=>."""
return self.separator
|