/usr/share/pyshared/lamson/server.py is in python-lamson 1.0pre11-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 | """
The majority of the server related things Lamson needs to run, like receivers,
relays, and queue processors.
"""
import smtplib
import smtpd
import asyncore
import threading
import socket
import logging
from lamson import queue, mail, routing
import time
import traceback
from lamson.bounce import PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES, COMBINED_STATUS_CODES
def undeliverable_message(raw_message, failure_type):
"""
Used universally in this file to shove totally screwed messages
into the routing.Router.UNDELIVERABLE_QUEUE (if it's set).
"""
if routing.Router.UNDELIVERABLE_QUEUE:
key = routing.Router.UNDELIVERABLE_QUEUE.push(raw_message)
logging.error("Failed to deliver message because of %r, put it in "
"undeliverable queue with key %r", failure_type, key)
class SMTPError(Exception):
"""
You can raise this error when you want to abort with a SMTP error code to
the client. This is really only relevant when you're using the
SMTPReceiver and the client understands the error.
If you give a message than it'll use that, but it'll also produce a
consistent error message based on your code. It uses the errors in
lamson.bounce to produce them.
"""
def __init__(self, code, message=None):
self.code = code
self.message = message or self.error_for_code(code)
Exception.__init__(self, "%d %s" % (self.code, self.message))
def error_for_code(self, code):
primary, secondary, tertiary = str(code)
primary = PRIMARY_STATUS_CODES.get(primary, "")
secondary = SECONDARY_STATUS_CODES.get(secondary, "")
combined = COMBINED_STATUS_CODES.get(primary + secondary, "")
return " ".join([primary, secondary, combined]).strip()
class Relay(object):
"""
Used to talk to your "relay server" or smart host, this is probably the most
important class in the handlers next to the lamson.routing.Router.
It supports a few simple operations for sending mail, replying, and can
log the protocol it uses to stderr if you set debug=1 on __init__.
"""
def __init__(self, host='127.0.0.1', port=25, username=None, password=None,
ssl=False, starttls=False, debug=0):
"""
The hostname and port we're connecting to, and the debug level (default to 0).
Optional username and password for smtp authentication.
If ssl is True smtplib.SMTP_SSL will be used.
If starttls is True (and ssl False), smtp connection will be put in TLS mode.
It does the hard work of delivering messages to the relay host.
"""
self.hostname = host
self.port = port
self.debug = debug
self.username = username
self.password = password
self.ssl = ssl
self.starttls = starttls
def configure_relay(self, hostname):
if self.ssl:
relay_host = smtplib.SMTP_SSL(hostname, self.port)
else:
relay_host = smtplib.SMTP(hostname, self.port)
relay_host.set_debuglevel(self.debug)
if self.starttls:
relay_host.starttls()
if self.username and self.password:
relay_host.login(self.username, self.password)
assert relay_host, 'Code error, tell Zed.'
return relay_host
def deliver(self, message, To=None, From=None):
"""
Takes a fully formed email message and delivers it to the
configured relay server.
You can pass in an alternate To and From, which will be used in the
SMTP send lines rather than what's in the message.
"""
recipient = To or message['To']
sender = From or message['From']
hostname = self.hostname or self.resolve_relay_host(recipient)
try:
relay_host = self.configure_relay(hostname)
except socket.error:
logging.exception("Failed to connect to host %s:%d" % (hostname, self.port))
return
relay_host.sendmail(sender, recipient, str(message))
relay_host.quit()
def resolve_relay_host(self, To):
import DNS
address, target_host = To.split('@')
mx_hosts = DNS.mxlookup(target_host)
if not mx_hosts:
logging.debug("Domain %r does not have an MX record, using %r instead.", target_host, target_host)
return target_host
else:
logging.debug("Delivering to MX record %r for target %r", mx_hosts[0], target_host)
return mx_hosts[0][1]
def __repr__(self):
"""Used in logging and debugging to indicate where this relay goes."""
return "<Relay to (%s:%d)>" % (self.hostname, self.port)
def reply(self, original, From, Subject, Body):
"""Calls self.send but with the from and to of the original message reversed."""
self.send(original['from'], From=From, Subject=Subject, Body=Body)
def send(self, To, From, Subject, Body):
"""
Does what it says, sends an email. If you need something more complex
then look at lamson.mail.MailResponse.
"""
msg = mail.MailResponse(To=To, From=From, Subject=Subject, Body=Body)
self.deliver(msg)
class SMTPReceiver(smtpd.SMTPServer):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, host='127.0.0.1', port=8825):
"""
Initializes to bind on the given port and host/ipaddress. Typically
in deployment you'd give 0.0.0.0 for "all internet devices" but consult
your operating system.
This uses smtpd.SMTPServer in the __init__, which means that you have to
call this far after you use python-daemonize or else daemonize will
close the socket.
"""
self.host = host
self.port = port
smtpd.SMTPServer.__init__(self, (self.host, self.port), None)
def start(self):
"""
Kicks everything into gear and starts listening on the port. This
fires off threads and waits until they are done.
"""
logging.info("SMTPReceiver started on %s:%d." % (self.host, self.port))
self.poller = threading.Thread(target=asyncore.loop,
kwargs={'timeout':0.1, 'use_poll':True})
self.poller.start()
def process_message(self, Peer, From, To, Data):
"""
Called by smtpd.SMTPServer when there's a message received.
"""
try:
logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To))
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError, err:
# looks like they want to return an error, so send it out
return str(err)
undeliverable_message(Data, "Handler raised SMTPError on purpose: %s" % err)
except:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r." %
(Peer, From, To))
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
logging.error(traceback.format_exc())
class QueueReceiver(object):
"""
Rather than listen on a socket this will watch a queue directory and
process messages it recieves from that. It works in almost the exact
same way otherwise.
"""
def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None):
"""
The router should be fully configured and ready to work, the
queue_dir can be a fully qualified path or relative.
"""
self.queue = queue.Queue(queue_dir, pop_limit=size_limit,
oversize_dir=oversize_dir)
self.queue_dir = queue_dir
self.sleep = sleep
def start(self, one_shot=False):
"""
Start simply loops indefinitely sleeping and pulling messages
off for processing when they are available.
If you give one_shot=True it will run once rather than do a big
while loop with a sleep.
"""
logging.info("Queue receiver started on queue dir %s" %
(self.queue_dir))
logging.debug("Sleeping for %d seconds..." % self.sleep)
inq = queue.Queue(self.queue_dir)
while True:
key = None
try:
key, msg = inq.pop()
while key:
logging.debug("Pulled message with key: %r off", key)
self.process_message(msg)
key, msg = inq.pop()
if one_shot:
return
else:
time.sleep(self.sleep)
except:
logging.exception("Error popping from the queue, this might be a problem.")
undeliverable_message(exc.data, exc._message)
time.sleep(self.sleep)
def process_message(self, msg):
"""
Exactly the same as SMTPReceiver.process_message but just designed for the queue's
quirks.
"""
try:
Peer = self.queue_dir # this is probably harmless but I should check it
From = msg['from']
To = [msg['to']]
logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To))
routing.Router.deliver(msg)
except SMTPError, err:
# looks like they want to return an error, so send it out
logging.exception("Raising SMTPError when running in a QueueReceiver is unsupported.")
undeliverable_message(msg.original, err.message)
except:
logging.exception("Exception while processing message from Peer: "
"%r, From: %r, to To %r." % (Peer, From, To))
undeliverable_message(msg.original, "Router failed to catch exception.")
|