This file is indexed.

/usr/share/pyshared/apache_openid/handlers/openidteams/mixins.py is in python-apache-openid 2.0.1-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright (C) 2005 JanRain, Inc.
# Copyright (C) 2009, 2010 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import datetime
import urllib
import urlparse

from apache_openid import logging


class SubstitutionMixin(object):

    def _substitutions(self):
        """A dictionary of substitutions.

        $-substitutions are allowed, using the CGI environment, plus the
        public attributes of the request object, plus session data, in that
        order of preference.
        """
        substitutions = self.request.subprocess_env.copy()
        for attribute in dir(self.request):
            if attribute.startswith('_'):
                continue
            value = getattr(self.request, attribute)
            if isinstance(value, basestring):
                substitutions[attribute] = value
        substitutions.update(self.session)
        self.debug('substitutions: %r', substitutions)
        return substitutions


class CookiedTeamsMixin(SubstitutionMixin):

    @property
    def cache_lifetime(self):
        """The team cache lifetime from the config options."""
        cache_lifetime = self.options.get('team-cache-lifetime')
        if cache_lifetime is None:
            logging.error('team-cache-lifetime should be set explicitly. Using default for now.')
            cache_lifetime = 43200
        try:
            cache_lifetime = int(cache_lifetime)
        except ValueError:
            logging.error('Bad team-cache-lifetime: %s', cache_lifetime)
            cache_lifetime = 43200
        logging.debug('team-cache-lifetime: %s', cache_lifetime)
        return datetime.timedelta(seconds=cache_lifetime)

    def _get_cookied_teams(self):
        """Get the list of Launchpad teams for the current user.
        The cookied teams list is cached for a period of time, specified by
        the team-cache-lifetime option.
        """
        # BarryWarsaw 18-Dec-2008 At one point I tried to use a separate class
        # to represent the cookied team cache, but that had several problems.
        # Because the session data is actually pickled into the dbm file, the
        # instance stored here must be picklable.  However, in order to do
        # logging in the cache instance, you'd have to pass a hook into that
        # instance which would not be picklable (e.g. an instancemethod or the
        # Apache request object).  So then you'd have to deal with the pickle
        # protocol and figure out a way to initialize that log hook when the
        # session is unpickled.  Or forgo logging.  Neither option is really
        # acceptable, so the choice was made to use a simpler inline cache.
        # However this means that we need to hijack assignment to
        # .cookied_teams in order to keep the cache consistent.
        team_cache = self.session.setdefault('cookied_teams', {})
        # Evict expired cache items.  Use .keys() to avoid
        # mutation-while-iterating exceptions.
        now = datetime.datetime.now()
        for team in team_cache.keys():
            if team_cache[team] < now:
                logging.debug('Evicting cookied team: %s', team)
                del team_cache[team]
        return team_cache.keys()

    def _set_cookied_teams(self, teams):
        """Set the list of Launchpad teams for the current user.
        We pervert the meaning of assignment here to be extension.  Use this
        instead of .extend() on self.cookied_teams because the latter will not
        be persistent.  Practicality beats purity.
        """
        expiration_date = datetime.datetime.now() + self.cache_lifetime
        team_cache = self.session.setdefault('cookied_teams', {})
        team_cache.update(dict((team, expiration_date) for team in teams))

    def _del_cookied_teams(self):
        """Delete the list of Launchpad teams for the current user."""
        self.session.setdefault('cookied_teams', {}).clear()

    cookied_teams = property(_get_cookied_teams, _set_cookied_teams,
                             _del_cookied_teams, _get_cookied_teams.__doc__)

    @property
    def authorized_teams(self):
        """Get all authorized teams for this request."""
        authorized_teams = []
        if 'authorized-teams' in self.options:
            authorized_teams.extend(self.get_authorized_teams_from_options())
        if 'authorized-teams-list-url' in self.options:
            url = self.options.get('authorized-teams-list-url')
            parts = urlparse.urlparse(url)
            # Use the url scheme to determine how to get the team list.  We
            # have a few custom ones (e.g. regexp: and xmlrpc:) and Python
            # itself provides most defaults through urllib.
            teams = self.get_authorized_teams_from_url(url)
            authorized_teams.extend(teams)
        return authorized_teams

    def get_authorized_teams_from_options(self):
        authorized_teams_raw = self.options.get('authorized-teams')
        return self.parse_authorized_teams(authorized_teams_raw)

    def get_authorized_teams_from_url(self, url):
        try:
            url_handle = urllib.urlopen(url)
            data = url_handle.read()
            url_handle.close()
            return self.parse_authorized_teams(data)
        except IOError:
            logging.error('Failed to fetch authorized list URL %r', url)
            return []

    def parse_authorized_teams(self, data):
        """Parse a string containing whitespace-separated team names."""
        teams = []
        for s in data.split():
            team = s.strip()
            if not team or team[0] == '#':
                continue
            teams.append(team)
        return teams