This file is indexed.

/usr/share/pyaimt/src/tlib/scheduler.py is in pyaimt 0.8.0.1-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#This scheduler works by creating a separate thread for each class of packets, and letting these threads
#manage the execution of their respective messages. If a thread finds that it has no more messages in its
#queue even after waiting a short while the thread will terminate, saving its state. This is also the exit
#condition for the Scheduler; there is no way to shut down all threads when a user logs out. If another
#message shows up destined for a terminated thread, the thread is recreated from the data it saved when
#it exited.
#
#I believe this will lead to two or three threads per online user after the startup period is over.

import threading
import time
import Queue
import sys

class Scheduler(object):
    def __init__(self,handler):
        self.freezer={}
        self.handler=handler
        self.bigLock=threading.Lock()
        # Create default class, just in case
        self.threads={'default' : self.QueueThread('default',handler,self.freezer)}
        self.snacs={'default':'default'}

    def enqueue(self,fam,sub,snac):
        snacid=str(fam)+str(sub)
        if (not self.snacs.has_key(snacid)):
            # We don't have a class, so assume class "default"
            snacid='default'
        classid=self.snacs[snacid]
        self.bigLock.acquire()
        if (not self.threads.has_key(classid) or not self.threads[classid].isAlive()):
            self.threads[classid]=self.QueueThread(classid,self.handler,self.freezer)
        self.threads[classid].enqueue(snac)
        self.bigLock.release()
  
    def stop(self):
        self.bigLock.acquire()
        for q in self.threads.values():
            q.running = False
        self.bigLock.release()
        
    def bindIntoClass(self,fam,sub,classid):
        """
        Messages come in marked with fam and sub; we need to bind them into classes.
        AOL tells us what fam,sub combination goes to which class.
        """
        snacid=str(fam)+str(sub)
        classid=str(classid)
        self.snacs[snacid]=classid

    def setStat(self,classid,window=-1,clear=-1,alert=-1,limit=-1,disconnect=-1,rate=-1,lasttime=-1,maxrate=-1):
        """
        AOL also tells us what our limits are and what our current rate is.
        """
        classid=str(classid)
        target=clear
        self.bigLock.acquire()
        if (not self.threads.has_key(classid) or not self.threads[classid].isAlive()):
            self.threads[classid]=self.QueueThread(classid,self.handler,self.freezer)
        self.threads[classid].setStat(window=window,rate=rate,target=target,lasttime=lasttime,max=maxrate)
        self.bigLock.release()
        
    class QueueThread(threading.Thread):
            
        def __init__(self,name,handler,freezer):
            threading.Thread.__init__(self)
            self.running = True
            self.name=name
            self.handler=handler
            self.freezer=freezer
            if (freezer.has_key(name)):
                self.rm=freezer[name]
            else:
                self.rm=Scheduler.RateManager()
                self.freezer[name]=self.rm
            self.q=Queue.Queue()
            self.setDaemon(True)
            self.start()
    
        def run(self):
            while self.running:
               try:
                    snac=self.q.get(True,self.rm.waithint)
                    delay=self.rm.getDelay()
                    time.sleep(delay)
                    self.__process(snac)
               except Queue.Empty:
                    break

        def setStat(self,window=-1,rate=-1,target=-1,lasttime=-1,max=-1):
            self.rm.setStat(window=window,rate=rate,target=target,lasttime=lasttime,max=max)

        def enqueue(self,snac):
            self.q.put(snac)
                
        def __process(self,snac):
            self.handler(snac)
            self.rm.updateRate()
    
    class RateManager:
        #This class calculates the current rate and delay needed not to overrun a target rate. 
        #Remember, it's not "rate" so much as "average delay". It goes down as traffic increases!
        #
        #This class should be general enough to use with any scheduler.
    
        def __init__(self):
            self.lock=threading.RLock()
            self.waithint=60
            self.rate=-1
            self.target=-1
            self.window=-1
            self.lasttime=-1
            self.max=-1

        def setStat(self,window=-1,rate=-1,target=-1,lasttime=-1,max=-1):
            self.lock.acquire()
            if (window != -1):
                self.ratehint=window
                self.window=window
            if (rate != -1):
                self.rate=rate
            if (target != -1):
                self.target=target
            if (lasttime != -1):
                self.lasttime=lasttime
            if (max != -1):
                self.max=max
            self.lock.release()

        def getDelay(self):
            """
            Get the delay needed not to overrun target rate.
            """
            self.lock.acquire()
            nexttime=(self.window*self.target-(self.window-1)*self.rate)/1000.+self.lasttime
            now=time.time()
            self.lock.release()
            if (nexttime < now or self.rate == -1):
                return 0
            else:
                return (nexttime-now)
              
        def updateRate(self):
            """
            Record that a message has been sent and update data.
            """
            self.lock.acquire()
            if (self.window == -1):
                return
            now=time.time()
            newrate=(self.window-1.)/self.window * self.rate + 1./self.window * (now-self.lasttime)*1000
            if (newrate > self.max):
                self.rate=self.max
            else:
                self.rate=newrate
            self.lasttime=now
            self.lock.release()