This file is indexed.

/usr/share/pyshared/lamson/spam.py is in python-lamson 1.0pre11-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
Uses the SpamBayes system to perform filtering and classification
of email.  It's designed so that you attach a single decorator
to the state functions you need to be "spam free", and then use the
lamson.spam.Filter code to do training.

SpamBayes comes with extensive command line tools for processing
maildir and mbox for spam.  A good way to train SpamBayes is to 
take mail that you know is spam and stuff it into a maildir, then
periodically use the SpamBayes tools to train from that.
"""

from functools import wraps
from lamson import queue
from spambayes import hammie, Options, storage
import os
import logging

class Filter(object):
    """
    This code implements simple filtering and is taken from the
    SpamBayes documentation.
    """
    def __init__(self, storage_file, config):
        options = Options.options
        options["Storage", "persistent_storage_file"] = storage_file
        options.merge_files(['/etc/hammierc', os.path.expanduser(config)])

        self.include_trained = Options.options["Headers", "include_trained"]
        self.dbname, self.usedb = storage.database_type([])

        self.mode = None
        self.h = None

        assert not Options.options["Hammie", "train_on_filter"], "Cannot train_on_filter."

    def open(self, mode):
        assert not self.h, "Cannot reopen, close first."
        assert not self.mode, "Mode should be None on open, bad state."
        assert mode in ['r', 'c'], "Must give a valid mode: r, c."

        self.mode = mode
        self.h = hammie.open(self.dbname, self.usedb, self.mode)

    def close(self):
        if not self.h: return

        assert self.mode, "Mode was not set."
        assert self.mode in ['r','c'], "self.mode was not r or c. Bad state."

        if self.mode == 'c':
            self.h.store()
            self.h.close()

        self.h = None
        self.mode = None


    def filter(self, msg):
        self.open('r')
        result = self.h.filter(msg)
        self.close()
        return result

    def train_ham(self, msg):
        self.open('c')
        self.h.train_ham(msg, self.include_trained)
        self.close()

    def train_spam(self, msg):
        self.open('c')
        self.h.train_spam(msg, self.include_trained)
        self.close()

    def untrain_ham(self, msg):
        self.open('c')
        self.h.untrain_ham(msg)
        self.close()

    def untrain_spam(self, msg):
        self.open('c')
        self.h.untrain_spam(msg)
        self.close()




class spam_filter(object):
    """
    This is a decorator you attach to states that should be protected from spam.
    You use it by doing:

        @spam_filter(ham_db, rcfile, spam_dump_queue, next_state=SPAMMING)

    Where ham_db is the path to your hamdb configuration, rcfile is the 
    SpamBayes config, and spam_dump_queue is where this filter should
    dump spam it detects.

    The next_state argument is optional, defaulting to None, but if you use
    it then Lamson will transition that user into that state.  Use it to mark
    that address as a spammer and to ignore their emails or do something
    fancy with them.
    """

    def __init__(self, storage, config, spam_queue, next_state=None):
        self.storage = storage
        self.config = config
        self.spam_queue = spam_queue
        self.next_state = next_state
        assert self.next_state, "You must give next_state function."

        if not os.path.exists(self.storage):
            logging.warn("SPAM filter for %r does not have a valid storage path, it'll still run but won't do anything.",
                        (self.storage, self.config, self.spam_queue,
                         self.next_state.__name__))
            self.functioning = False
        else:
            self.functioning = True

    def __call__(self, fn):
        @wraps(fn)
        def category_wrapper(message, *args, **kw):
            if self.functioning:
                if self.spam(message.to_message()):
                    self.enqueue_as_spam(message.to_message())
                    return self.next_state
                else:
                    return fn(message, *args, **kw)
            else:
                return fn(message, *args, **kw)
        return category_wrapper

    def spam(self, message):
        """Determines if the message is spam or not."""
        spfilter = Filter(self.storage, self.config)
        spfilter.filter(message)

        if 'X-Spambayes-Classification' in message:
            return message['X-Spambayes-Classification'].startswith('spam')
        else:
            return False

    def enqueue_as_spam(self, message):
        """Drops the message into the configured spam queue."""
        outq = queue.Queue(self.spam_queue)
        outq.push(str(message))