This file is indexed.

/usr/lib/python3/dist-packages/lib389/schema.py is in python3-lib389 1.3.7.10-1ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2015 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---

"""
   You will access this from:
   DirSrv.schema.methodName()
"""
import glob
import ldap
from ldap.schema.models import AttributeType, ObjectClass, MatchingRule

from lib389._constants import *
from lib389._constants import DN_SCHEMA
from lib389.utils import ds_is_newer
from lib389._mapped_object import DSLdapObject
from lib389.tasks import SchemaReloadTask


class Schema(DSLdapObject):
    def __init__(self, instance, batch=False):
        super(Schema, self).__init__(instance=instance, batch=batch)
        self._dn = DN_SCHEMA
        self._rdn_attribute = 'cn'

    def reload(self, schema_dir=None):
        task = SchemaReloadTask(self._instance)

        task_properties = {}
        if schema_dir is not None:
            task_properties['schemadir'] = schema_dir

        task.create(properties=task_properties)

        return task


class SchemaLegacy(object):

    def __init__(self, conn):
        """@param conn - a DirSrv instance"""
        self.conn = conn
        self.log = conn.log

    def get_entry(self):
        """get the schema as an LDAP entry"""
        attrs = ['attributeTypes', 'objectClasses']
        return self.conn.search_s(DN_SCHEMA, ldap.SCOPE_BASE,
                                  'objectclass=*', attrs)[0]

    def get_subschema(self):
        """get the schema as a python-ldap SubSchema object"""
        return ldap.schema.SubSchema(self.get_entry().data)

    def list_files(self):
        """return a list of the schema files in the instance schemadir"""
        file_list = []
        file_list += glob.glob(self.conn.schemadir + "/*.ldif")
        if ds_is_newer('1.3.6.0'):
            file_list += glob.glob(self.conn.ds_paths.system_schema_dir + "/*.ldif")
        return file_list

    def file_to_ldap(self, filename):
        """convert the given schema file name to its python-ldap format
        suitable for passing to ldap.schema.SubSchema()
        @param filename - the full path and filename of a schema file in ldif
        format"""
        import six.moves.urllib.request
        import six.moves.urllib.parse
        import ldif

        ldif_file = six.moves.urllib.request.urlopen('file://' + filename)
        ldif_parser = ldif.LDIFRecordList(ldif_file, max_entries=1)
        if not ldif_parser:
            return None
        ldif_parser.parse()
        if not ldif_parser.all_records:
            return None
        return ldif_parser.all_records[0][1]

    def file_to_subschema(self, filename):
        """convert the given schema file name to its python-ldap format
        ldap.schema.SubSchema object
        @param filename - the full path and filename of a schema file in ldif
        format"""
        ent = self.file_to_ldap(filename)
        if not ent:
            return None
        return ldap.schema.SubSchema(ent)

    def add_schema(self, attr, val):
        """Add a schema element to the schema.
        @param attr - the attribute type to use e.g. attributeTypes or
                      objectClasses
        @param val the schema element definition to add"""
        self.conn.modify_s(DN_SCHEMA, [(ldap.MOD_ADD, attr, val)])

    def del_schema(self, attr, val):
        """Delete a schema element from the schema.
        @param attr - the attribute type to use e.g. attributeTypes or
                      objectClasses
        @param val - the schema element definition to delete"""
        self.conn.modify_s(DN_SCHEMA, [(ldap.MOD_DELETE, attr, val)])

    def add_attribute(self, *attributes):
        """Add an attribute type definition to the schema.
        @param attributes a single or list of attribute type defintions to add
        """
        return self.add_schema('attributeTypes', attributes)

    def add_objectclass(self, *objectclasses):
        """Add an object class definition to the schema.
        @param objectclasses a single or list of object class defintions to add
        """
        return self.add_schema('objectClasses', objectclasses)

    def get_schema_csn(self):
        """return the schema nsSchemaCSN attribute"""
        ents = self.conn.search_s(DN_SCHEMA, ldap.SCOPE_BASE,
                                  "objectclass=*", ['nsSchemaCSN'])
        ent = ents[0]
        return ent.getValue('nsSchemaCSN')

    def get_objectclasses(self):
        """Returns a list of ldap.schema.models.ObjectClass objects for all
        objectClasses supported by this instance.
        """
        attrs = ['objectClasses']
        results = self.conn.search_s(DN_SCHEMA, ldap.SCOPE_BASE,
                                     'objectclass=*', attrs)[0]
        objectclasses = [ObjectClass(oc) for oc in
                         results.getValues('objectClasses')]
        return objectclasses

    def get_attributetypes(self):
        """Returns a list of ldap.schema.models.AttributeType objects for all
        attributeTypes supported by this instance.
        """
        attrs = ['attributeTypes']
        results = self.conn.search_s(DN_SCHEMA, ldap.SCOPE_BASE,
                                     'objectclass=*', attrs)[0]
        attributetypes = [AttributeType(at) for at in
                          results.getValues('attributeTypes')]
        return attributetypes

    def get_matchingrules(self):
        """Return a list of the server defined matching rules"""
        attrs = ['matchingrules']
        results = self.conn.search_s(DN_SCHEMA, ldap.SCOPE_BASE,
                                     'objectclass=*', attrs)[0]
        matchingRules = [MatchingRule(mr) for mr in
                         results.getValues('matchingRules')]
        return matchingRules

    def query_matchingrule(self, mr_name):
        """Returns a single matching rule instance that matches the mr_name.
        Returns None if the matching rule doesn't exist.

        @param mr_name - The name of the matching rule you want to query.

        return MatchingRule or None

        <ldap.schema.models.MatchingRule instance>
        """
        matchingRules = self.get_matchingrules()
        matchingRule = [mr for mr in matchingRules if mr_name.lower() in
                        list(map(str.lower, mr.names))]
        if len(matchingRule) != 1:
            # This is an error.
            return None
        matchingRule = matchingRule[0]
        return matchingRule

    def query_objectclass(self, objectclassname):
        """Returns a single ObjectClass instance that matches objectclassname.
        Returns None if the objectClass doesn't exist.

        @param objectclassname - The name of the objectClass you want to query.

        return ObjectClass or None

        ex. query_objectclass('account')
        <ldap.schema.models.ObjectClass instance>
        """
        objectclasses = self.get_objectclasses()

        objectclass = [oc for oc in objectclasses if objectclassname.lower() in
                       list(map(str.lower, oc.names))]
        if len(objectclass) != 1:
            # This is an error.
            return None
        objectclass = objectclass[0]
        return objectclass

    def query_attributetype(self, attributetypename):
        """Returns a tuple of the AttributeType, and what objectclasses may or
        must take this attributeType. Returns None if attributetype doesn't
        exist.

        @param attributetypename - The name of the attributeType you want to
        query

        return (AttributeType, Must, May) or None

        ex. query_attributetype('uid')
        ( <ldap.schema.models.AttributeType instance>,
         [<ldap.schema.models.ObjectClass instance>, ...],
         [<ldap.schema.models.ObjectClass instance>, ...] )
        """
        # First, get the attribute that matches name. We need to consider
        # alternate names. There is no way to search this, so we have to
        # filter our set of all attribute types.
        objectclasses = self.get_objectclasses()
        attributetypes = self.get_attributetypes()
        attributetypename = attributetypename.lower()

        attributetype = [at for at in attributetypes
                         if attributetypename.lower() in
                         list(map(str.lower, at.names))]
        if len(attributetype) != 1:
            # This is an error.
            return None
        attributetype = attributetype[0]
        # Get the primary name of this attribute
        attributetypename = attributetype.names[0]
        # Build a set if they have may.
        may = [oc for oc in objectclasses if attributetypename.lower() in
               list(map(str.lower, oc.may))]
        # Build a set if they have must.
        must = [oc for oc in objectclasses if attributetypename.lower() in
                list(map(str.lower, oc.must))]
        return (attributetype, must, may)